本文介紹一種高效的,頗有價值的無等待隊列實現 fast wait-free queue。
它基於FAA(fetch-and-add)+CAS(compare-and-swap)來實現。第一步的FAA避開高度競爭狀況下第一個CAS形成的競爭。
同時它還提供最強的wait-freedom保證progress。node
我我的給它提供了Java的一個實現:WaitFreeQueueForJavagit
首先:它包含以下5種數據結構。github
對於一個cell,它的初始值爲BOT,因此第一步是:數據結構
接着,假如enqueue Thread成功,條件變爲:cell->value==val, 那麼enqueue就完成操做,結束對該cell的訪問。因此接下來dequeue Thread因爲可能存在競爭,須要把cell的deq原子地改變爲TOP,從而結束操做:dom
因此如今val已經被Thread ONE或者Thread TWO獲取了。fetch
可是,假如STEP ONE中的enqueue操做失敗了呢?那麼會怎麼樣呢?
此時,cell->val == TOP,作法是enqueue保留該cell的index,若是在嘗試次數內就再次獲取一個新的cell來嘗試。
不然就構造本身的enq Request,改成將enq Request塞入新的cell.
而dequeue則在將val變爲TOP以後,查看該cell的enq Request,若是已經有了,則嘗試配對這個enq Request和該cell(enq.id == cell.index)。若是沒有,則會從線程handle_t構成的ring中抽取下一個enq Request,接着嘗試將該enq Request塞入該cell。若是沒有enq Request則將該cell的enq改成TOP。若是去到enq Request則嘗試將它配對(enq.id == cell.index)。
不管如何,該cell一定要被消耗掉:spa
以後,一樣的dequeue經過競爭cell->deq來獲取val。此過程如圖下:
Atomic: cell->enq => ENQ線程
Atomic: cell->enq-id => (-cell->index)code
好,到此爲止,enqueue操做相關的全部操做都已走完。隊列
另外一方面,假如dequeue操做通過屢次還未成功,那麼它就會將該deq request放入線程的線程ring環裏面。
當其餘線程dequeue成功以後,就會順着線程ring環去獲取協助該deq。
他們再也不把deq直接變爲TOP,而是變爲指向那個deq Request。
而且經過將deq的idx先指向選定的cell,而後改變idx的值爲負,從而通知deq request的線程操做已完成。
如圖所示:
它的enqueue代碼以下:
void enqueue(queue_t *q, handle_t *th, void *v) { th->hzd_node_id = th->enq_node_id; long id; int p = MAX_PATIENCE; while (!enq_fast(q, th, v, &id) && p-- > 0) ; if (p < 0) enq_slow(q, th, v, id); th->enq_node_id = th->Ep->id; RELEASE(&th->hzd_node_id, -1); } static int enq_fast(queue_t *q, handle_t *th, void *v, long *id) { long i = FAAcs(&q->Ei, 1); cell_t *c = find_cell(&th->Ep, i, th); void *cv = BOT; if (CAS(&c->val, &cv, v)) { #ifdef RECORD th->fastenq++; #endif return 1; } else { *id = i; return 0; } } static void enq_slow(queue_t *q, handle_t *th, void *v, long id) { enq_t *enq = &th->Er; enq->val = v; RELEASE(&enq->id, id); node_t *tail = th->Ep; long i; cell_t *c; do { i = FAA(&q->Ei, 1); c = find_cell(&tail, i, th); enq_t *ce = BOT; if (CAScs(&c->enq, &ce, enq) && c->val != TOP) { if (CAS(&enq->id, &id, -i)) id = -i; break; } } while (enq->id > 0); id = -enq->id; c = find_cell(&th->Ep, id, th); if (id > i) { long Ei = q->Ei; while (Ei <= id && !CAS(&q->Ei, &Ei, id + 1)) ; } c->val = v; #ifdef RECORD th->slowenq++; #endif } static void *help_enq(queue_t *q, handle_t *th, cell_t *c, long i) { void *v = spin(&c->val); if ((v != TOP && v != BOT) || (v == BOT && !CAScs(&c->val, &v, TOP) && v != TOP)) { return v; } enq_t *e = c->enq; if (e == BOT) { handle_t *ph; enq_t *pe; long id; ph = th->Eh, pe = &ph->Er, id = pe->id; if (th->Ei != 0 && th->Ei != id) { th->Ei = 0; th->Eh = ph->next; ph = th->Eh, pe = &ph->Er, id = pe->id; } if (id > 0 && id <= i && !CAS(&c->enq, &e, pe)) th->Ei = id; else th->Eh = ph->next; if (e == BOT && CAS(&c->enq, &e, TOP)) e = TOP; } if (e == TOP) return (q->Ei <= i ? BOT : TOP); long ei = ACQUIRE(&e->id); void *ev = ACQUIRE(&e->val); if (ei > i) { if (c->val == TOP && q->Ei <= i) return BOT; } else { if ((ei > 0 && CAS(&e->id, &ei, -i)) || (ei == -i && c->val == TOP)) { long Ei = q->Ei; while (Ei <= i && !CAS(&q->Ei, &Ei, i + 1)) ; c->val = ev; } } return c->val; }
dequeue代碼以下:
void *dequeue(queue_t *q, handle_t *th) { th->hzd_node_id = th->deq_node_id; void *v; long id = 0; int p = MAX_PATIENCE; do v = deq_fast(q, th, &id); while (v == TOP && p-- > 0); if (v == TOP) v = deq_slow(q, th, id); else { #ifdef RECORD th->fastdeq++; #endif } if (v != EMPTY) { help_deq(q, th, th->Dh); th->Dh = th->Dh->next; } th->deq_node_id = th->Dp->id; RELEASE(&th->hzd_node_id, -1); if (th->spare == NULL) { cleanup(q, th); th->spare = new_node(); } #ifdef RECORD if (v == EMPTY) th->empty++; #endif return v; } static void *deq_fast(queue_t *q, handle_t *th, long *id) { long i = FAAcs(&q->Di, 1); cell_t *c = find_cell(&th->Dp, i, th); void *v = help_enq(q, th, c, i); deq_t *cd = BOT; if (v == BOT) return BOT; if (v != TOP && CAS(&c->deq, &cd, TOP)) return v; *id = i; return TOP; } static void *deq_slow(queue_t *q, handle_t *th, long id) { deq_t *deq = &th->Dr; RELEASE(&deq->id, id); RELEASE(&deq->idx, id); help_deq(q, th, th); long i = -deq->idx; cell_t *c = find_cell(&th->Dp, i, th); void *val = c->val; #ifdef RECORD th->slowdeq++; #endif return val == TOP ? BOT : val; } static void help_deq(queue_t *q, handle_t *th, handle_t *ph) { deq_t *deq = &ph->Dr; long idx = ACQUIRE(&deq->idx); long id = deq->id; if (idx < id) return; node_t *Dp = ph->Dp; th->hzd_node_id = ph->hzd_node_id; FENCE(); idx = deq->idx; long i = id + 1, old = id, new = 0; while (1) { node_t *h = Dp; for (; idx == old && new == 0; ++i) { cell_t *c = find_cell(&h, i, th); long Di = q->Di; while (Di <= i && !CAS(&q->Di, &Di, i + 1)) ; void *v = help_enq(q, th, c, i); if (v == BOT || (v != TOP && c->deq == BOT)) new = i; else idx = ACQUIRE(&deq->idx); } if (new != 0) { if (CASra(&deq->idx, &idx, new)) idx = new; if (idx >= new) new = 0; } if (idx < 0 || deq->id != id) break; cell_t *c = find_cell(&Dp, idx, th); deq_t *cd = BOT; if (c->val == TOP || CAS(&c->deq, &cd, deq) || cd == deq) { CAS(&deq->idx, &idx, -idx); break; } old = idx; if (idx >= i) i = idx + 1; } }