Fast Wait-free Queue

本文介紹一種高效的,頗有價值的無等待隊列實現 fast wait-free queue
它基於FAA(fetch-and-add)+CAS(compare-and-swap)來實現。第一步的FAA避開高度競爭狀況下第一個CAS形成的競爭。
同時它還提供最強的wait-freedom保證progress。node

我我的給它提供了Java的一個實現:WaitFreeQueueForJavagit

首先:它包含以下5種數據結構。
圖片描述github

對於一個cell,它的初始值爲BOT,因此第一步是:
圖片描述數據結構

接着,假如enqueue Thread成功,條件變爲:cell->value==val, 那麼enqueue就完成操做,結束對該cell的訪問。因此接下來dequeue Thread因爲可能存在競爭,須要把cell的deq原子地改變爲TOP,從而結束操做:
圖片描述dom

因此如今val已經被Thread ONE或者Thread TWO獲取了。fetch

可是,假如STEP ONE中的enqueue操做失敗了呢?那麼會怎麼樣呢?
此時,cell->val == TOP,作法是enqueue保留該cell的index,若是在嘗試次數內就再次獲取一個新的cell來嘗試。
不然就構造本身的enq Request,改成將enq Request塞入新的cell.
而dequeue則在將val變爲TOP以後,查看該cell的enq Request,若是已經有了,則嘗試配對這個enq Request和該cell(enq.id == cell.index)。若是沒有,則會從線程handle_t構成的ring中抽取下一個enq Request,接着嘗試將該enq Request塞入該cell。若是沒有enq Request則將該cell的enq改成TOP。若是去到enq Request則嘗試將它配對(enq.id == cell.index)
不管如何,該cell一定要被消耗掉:spa

  • 要麼由於cell->enq變爲TOP從而消耗掉。
  • 要麼由於cell->enq變爲了某個enq request,可是和該request沒有配對成功,從而消耗掉。
  • 要麼cell->enq變爲了某個enq request,而且和該request配對成功,從而cell->val得到某個該request的值。

以後,一樣的dequeue經過競爭cell->deq來獲取val。此過程如圖下:
Atomic: cell->enq => ENQ
圖片描述線程

Atomic: cell->enq-id => (-cell->index)
圖片描述code

好,到此爲止,enqueue操做相關的全部操做都已走完。隊列

另外一方面,假如dequeue操做通過屢次還未成功,那麼它就會將該deq request放入線程的線程ring環裏面。
當其餘線程dequeue成功以後,就會順着線程ring環去獲取協助該deq。
他們再也不把deq直接變爲TOP,而是變爲指向那個deq Request。
而且經過將deq的idx先指向選定的cell,而後改變idx的值爲負,從而通知deq request的線程操做已完成。
如圖所示:
圖片描述

它的enqueue代碼以下:

void enqueue(queue_t *q, handle_t *th, void *v) {
    th->hzd_node_id = th->enq_node_id;

    long id;
    int p = MAX_PATIENCE;
    while (!enq_fast(q, th, v, &id) && p-- > 0)
        ;
    if (p < 0) enq_slow(q, th, v, id);

    th->enq_node_id = th->Ep->id;
    RELEASE(&th->hzd_node_id, -1);
}

static int enq_fast(queue_t *q, handle_t *th, void *v, long *id) {
    long i = FAAcs(&q->Ei, 1);
    cell_t *c = find_cell(&th->Ep, i, th);
    void *cv = BOT;

    if (CAS(&c->val, &cv, v)) {
#ifdef RECORD
        th->fastenq++;
#endif
        return 1;
    } else {
        *id = i;
        return 0;
    }
}

static void enq_slow(queue_t *q, handle_t *th, void *v, long id) {
    enq_t *enq = &th->Er;
    enq->val = v;
    RELEASE(&enq->id, id);

    node_t *tail = th->Ep;
    long i;
    cell_t *c;

    do {
        i = FAA(&q->Ei, 1);
        c = find_cell(&tail, i, th);
        enq_t *ce = BOT;

        if (CAScs(&c->enq, &ce, enq) && c->val != TOP) {
            if (CAS(&enq->id, &id, -i)) id = -i;
            break;
        }
    } while (enq->id > 0);

    id = -enq->id;
    c = find_cell(&th->Ep, id, th);
    if (id > i) {
        long Ei = q->Ei;
        while (Ei <= id && !CAS(&q->Ei, &Ei, id + 1))
            ;
    }
    c->val = v;

#ifdef RECORD
    th->slowenq++;
#endif
}

static void *help_enq(queue_t *q, handle_t *th, cell_t *c, long i) {
    void *v = spin(&c->val);

    if ((v != TOP && v != BOT) ||
        (v == BOT && !CAScs(&c->val, &v, TOP) && v != TOP)) {
        return v;
    }

    enq_t *e = c->enq;

    if (e == BOT) {
        handle_t *ph;
        enq_t *pe;
        long id;
        ph = th->Eh, pe = &ph->Er, id = pe->id;

        if (th->Ei != 0 && th->Ei != id) {
            th->Ei = 0;
            th->Eh = ph->next;
            ph = th->Eh, pe = &ph->Er, id = pe->id;
        }

        if (id > 0 && id <= i && !CAS(&c->enq, &e, pe))
            th->Ei = id;
        else
            th->Eh = ph->next;

        if (e == BOT && CAS(&c->enq, &e, TOP)) e = TOP;
    }

    if (e == TOP) return (q->Ei <= i ? BOT : TOP);

    long ei = ACQUIRE(&e->id);
    void *ev = ACQUIRE(&e->val);

    if (ei > i) {
        if (c->val == TOP && q->Ei <= i) return BOT;
    } else {
        if ((ei > 0 && CAS(&e->id, &ei, -i)) || (ei == -i && c->val == TOP)) {
            long Ei = q->Ei;
            while (Ei <= i && !CAS(&q->Ei, &Ei, i + 1))
                ;
            c->val = ev;
        }
    }

    return c->val;
}

dequeue代碼以下:

void *dequeue(queue_t *q, handle_t *th) {
    th->hzd_node_id = th->deq_node_id;

    void *v;
    long id = 0;
    int p = MAX_PATIENCE;

    do
        v = deq_fast(q, th, &id);
    while (v == TOP && p-- > 0);
    if (v == TOP)
        v = deq_slow(q, th, id);
    else {
#ifdef RECORD
        th->fastdeq++;
#endif
    }

    if (v != EMPTY) {
        help_deq(q, th, th->Dh);
        th->Dh = th->Dh->next;
    }

    th->deq_node_id = th->Dp->id;
    RELEASE(&th->hzd_node_id, -1);

    if (th->spare == NULL) {
        cleanup(q, th);
        th->spare = new_node();
    }

#ifdef RECORD
    if (v == EMPTY) th->empty++;
#endif
    return v;
}

static void *deq_fast(queue_t *q, handle_t *th, long *id) {
    long i = FAAcs(&q->Di, 1);
    cell_t *c = find_cell(&th->Dp, i, th);
    void *v = help_enq(q, th, c, i);
    deq_t *cd = BOT;

    if (v == BOT) return BOT;
    if (v != TOP && CAS(&c->deq, &cd, TOP)) return v;

    *id = i;
    return TOP;
}

static void *deq_slow(queue_t *q, handle_t *th, long id) {
    deq_t *deq = &th->Dr;
    RELEASE(&deq->id, id);
    RELEASE(&deq->idx, id);

    help_deq(q, th, th);
    long i = -deq->idx;
    cell_t *c = find_cell(&th->Dp, i, th);
    void *val = c->val;

#ifdef RECORD
    th->slowdeq++;
#endif
    return val == TOP ? BOT : val;
}

static void help_deq(queue_t *q, handle_t *th, handle_t *ph) {
    deq_t *deq = &ph->Dr;
    long idx = ACQUIRE(&deq->idx);
    long id = deq->id;

    if (idx < id) return;

    node_t *Dp = ph->Dp;
    th->hzd_node_id = ph->hzd_node_id;
    FENCE();
    idx = deq->idx;

    long i = id + 1, old = id, new = 0;
    while (1) {
        node_t *h = Dp;
        for (; idx == old && new == 0; ++i) {
            cell_t *c = find_cell(&h, i, th);

            long Di = q->Di;
            while (Di <= i && !CAS(&q->Di, &Di, i + 1))
                ;

            void *v = help_enq(q, th, c, i);
            if (v == BOT || (v != TOP && c->deq == BOT))
                new = i;
            else
                idx = ACQUIRE(&deq->idx);
        }

        if (new != 0) {
            if (CASra(&deq->idx, &idx, new)) idx = new;
            if (idx >= new) new = 0;
        }

        if (idx < 0 || deq->id != id) break;

        cell_t *c = find_cell(&Dp, idx, th);
        deq_t *cd = BOT;
        if (c->val == TOP || CAS(&c->deq, &cd, deq) || cd == deq) {
            CAS(&deq->idx, &idx, -idx);
            break;
        }

        old = idx;
        if (idx >= i) i = idx + 1;
    }
}
相關文章
相關標籤/搜索