java併發編程學習之再談ReentrantReadWriteLock

以前ReentrantReadWriteLock講了讀寫鎖的場景,這邊來說他的源碼,以非公平鎖爲例,其實和公平鎖主要代碼是一致的。segmentfault

Sync類

static final int SHARED_SHIFT   = 16;//高16位是共享,用於讀,低16位是獨佔,用於寫,用一個字段保證原子性
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);//左移16位,也就是高位的最後一個是0000 0000 0000 0001 0000 0000 0000 0000
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;//最大讀的數量,正常不會這麼多
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;//左移16位,再減1,也就是0000 0000 0000 0000 1111 1111 1111 1111
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }//無符號右移16位
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }//返回的不爲1,說明有寫鎖,由於低16位都是1,1與1爲1,若是有些,確定有個爲1
static final class HoldCounter {//每一個線程持有的鎖的數量
    int count = 0;
    // Use id, not reference, to avoid garbage retention
    final long tid = getThreadId(Thread.currentThread());
}
static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {//本地線程
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}
private transient ThreadLocalHoldCounter readHolds;//本地線程,記錄持有的鎖的數量信息
private transient HoldCounter cachedHoldCounter;//緩存HoldCounter 的數據
private transient Thread firstReader = null;//第一個獲取讀鎖的線程
private transient int firstReaderHoldCount;//第一個獲取讀鎖的線程持有的數量

讀鎖的lock方法

public void lock() {
    sync.acquireShared(1);
}
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)//小於0沒獲取到鎖
        doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();//獲取本地線程
    int c = getState();//獲取state的值
    if (exclusiveCount(c) != 0 &&//不爲0說明有寫鎖,緣由上面分析了
        getExclusiveOwnerThread() != current)//不是當前線程,說明不是重入
        return -1;
    int r = sharedCount(c);//獲取讀鎖的個數
    if (!readerShouldBlock() &&//讀鎖無堵塞
        r < MAX_COUNT &&//讀鎖沒到最大值
        compareAndSetState(c, c + SHARED_UNIT)) {//cas操做,高位加1成功說明獲取到了讀鎖
        if (r == 0) {//等於0說明第一個獲取讀鎖
            firstReader = current;//當前線程就是第一個
            firstReaderHoldCount = 1;//數量爲1
        } else if (firstReader == current) {//若是不是第一個,可是是當前線程
            firstReaderHoldCount++;//數量加1
        } else {//既不是第一個,也不是當前線程
            HoldCounter rh = cachedHoldCounter;//獲取緩存HoldCounter 
            if (rh == null || rh.tid != getThreadId(current))//若是不爲空,或者經過線程id對比不是當前線程
                cachedHoldCounter = rh = readHolds.get();//緩存設置爲當前線程
            else if (rh.count == 0)//緩存的是當前線程,並且鎖的數量爲0,加入到本地緩存,若是數量不爲0,說明已經在本地緩存了
                readHolds.set(rh);
            rh.count++;//鎖的數量加1
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}
//若是阻塞或者cas失敗的狀況,再重試獲取鎖
final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {//
        int c = getState();
        if (exclusiveCount(c) != 0) {//上面分析了,若是是寫鎖,而且不是當前線程,放棄
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        } else if (readerShouldBlock()) {//阻塞的狀況
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {//當前線程是第一個不處理
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();//若是是0,移除本地緩存
                    }
                }
                if (rh.count == 0)
                    return -1;//
            }
        }
        if (sharedCount(c) == MAX_COUNT)//讀鎖數量太大,拋異常
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {//沒有讀鎖
                firstReader = current;//當前線程就是第一個
                firstReaderHoldCount = 1;//數量爲1
            } else if (firstReader == current) {//若是不是第一個,可是是當前線程
                firstReaderHoldCount++;//數量加1
            } else {//既不是第一個,也不是當前線程
                if (rh == null)
                    rh = cachedHoldCounter;//獲取緩存HoldCounter 
                if (rh == null || rh.tid != getThreadId(current))//若是不爲空,或者經過線程id對比不是當前線程
                    rh = readHolds.get();//緩存設置爲當前線程
                else if (rh.count == 0)//緩存的是當前線程,並且鎖的數量爲0,加入到本地緩存,若是數量不爲0,說明已經在本地緩存了
                    readHolds.set(rh);
                rh.count++;//鎖的數量加1
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

讀鎖的unlock方法

public void unlock() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {//若是第一個是當前線程
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)//若是數量爲1,就是直接設爲空
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {//數量小於等於1,移除
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;//寫鎖和讀鎖爲0,無鎖
    }
}

寫鎖的lock方法

public void lock() {
    sync.acquire(1);
}
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//失敗了就進入阻塞隊列
        selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();//獲取當前線程
    int c = getState();//獲取state
    int w = exclusiveCount(c);//不爲0說明有寫鎖
    if (c != 0) {//有讀或者寫鎖
        // 無寫鎖或者讀鎖被佔
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);
        return true;
    }
    if (writerShouldBlock() ||//無阻塞
        !compareAndSetState(c, c + acquires))//設置成功
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

寫鎖的unlock方法

public void unlock() {
    sync.release(1);
}
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//喚醒
        return true;
    }
    return false;
}
protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())//不是獨佔鎖,拋異常
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;//寫鎖都釋放了
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}
相關文章
相關標籤/搜索