java併發:線程併發控制機制之ReadWriteLock

ReadWriteLock

Java中的ReadWriteLock是什麼?java

通常而言,讀寫鎖是用來提高併發程序性能的鎖分離技術的成果,Java中的ReadWriteLock是Java5中新增的一個接口,提供了readLock和writeLock兩種鎖機制。數據庫

一個ReadWriteLock維護一對關聯的鎖,一個用於只讀操做,一個用於寫;在沒有寫線程的狀況下,一個讀鎖可能會同時被多個讀線程持有,寫鎖是獨佔的。緩存

咱們來看一下ReadWriteLock的源碼:數據結構

public interface ReadWriteLock{
    Lock readLock();
    Lock writeLock();
}

解讀:多線程

從源碼上面咱們能夠看出來ReadWriteLock並非Lock的子接口,只不過ReadWriteLock藉助Lock來實現讀寫兩個鎖並存、互斥的操做機制。併發

在ReadWriteLock中每次讀取共享數據時須要讀取鎖,當修改共享數據時須要寫入鎖,這看起來好像是兩個鎖,可是並不是如此。app

ReentrantReadWriteLock

ReentrantReadWriteLock是ReadWriteLock在java.util裏面惟一的實現類,主要使用場景是當有不少線程都從某個數據結構中讀取數據,而不多有線程對其進行修改。ide

示例

例一

ReadLock和WriteLock單獨使用的狀況函數

package demo.thread;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockDemo {
    public static void main(String[] args) {
        final Count ct = new Count();
        for (int i = 0; i < 2; i++) {
            new Thread() {
                @Override
                public void run() {
                    ct.get();
                }
            }.start();
        }
        for (int i = 0; i < 2; i++) {
            new Thread() {
                @Override
                public void run() {
                    ct.put();
                }
            }.start();
        }
    }
}

class Count {
    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

    public void get() {
        rwl.readLock().lock();// 上讀鎖,其餘線程只能讀不能寫,具備高併發性
        try {
            System.out.println(Thread.currentThread().getName() + " read start.");
            Thread.sleep(1000L);// 模擬幹活
            System.out.println(Thread.currentThread().getName() + "read end.");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            rwl.readLock().unlock(); // 釋放寫鎖,最好放在finnaly裏面
        }
    }

    public void put() {
        rwl.writeLock().lock();// 上寫鎖,具備阻塞性
        try {
            System.out.println(Thread.currentThread().getName() + " write start.");
            Thread.sleep(1000L);// 模擬幹活
            System.out.println(Thread.currentThread().getName() + "write end.");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            rwl.writeLock().unlock(); // 釋放寫鎖,最好放在finnaly裏面
        }
    }

}

 

運行結果以下:高併發

Thread-1 read start.
Thread-0 read start.
Thread-1read end.
Thread-0read end.
Thread-3 write start.
Thread-3write end.
Thread-2 write start.
Thread-2write end.

從結果上面能夠看的出來,讀的時候是併發的,寫的時候是有順序的帶阻塞機制的

 

例二

ReadLock和WriteLock的複雜使用狀況,模擬一個有讀寫數據的場景

private final Map<String, Object> map = new HashMap<String, Object>();// 假設這裏面存了數據緩存
private final ReentrantReadWriteLock rwlock = new ReentrantReadWriteLock();

public Object readWrite(String id) {
      Object value = null;
      rwlock.readLock().lock();// 首先開啓讀鎖,從緩存中去取
      try {
            value = map.get(id);
            if (value == null) { // 若是緩存中沒有數據,釋放讀鎖,上寫鎖
                rwlock.readLock().unlock();
                rwlock.writeLock().lock();
                try {
                    if (value == null) {
                        value = "aaa"; // 此時能夠去數據庫中查找,這裏簡單的模擬一下
                    }
                } finally {
                    rwlock.writeLock().unlock(); // 釋放寫鎖
                }
                rwlock.readLock().lock(); // 而後再上讀鎖
            }
        } finally {
            rwlock.readLock().unlock(); // 最後釋放讀鎖
        }
      return value;
}

解讀:

請必定要注意讀寫鎖的獲取與釋放順序。

源碼解讀

類圖以下:

 

其構造函數以下:

    /**
     * Creates a new {@code ReentrantReadWriteLock} with
     * default (nonfair) ordering properties.
     */
    public ReentrantReadWriteLock() {
        this(false);
    }

    /**
     * Creates a new {@code ReentrantReadWriteLock} with
     * the given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

解讀:

讀寫鎖的內部維護了一個 ReadLock 和一個 WriteLock,它們依賴 Sync 實現具體功能;Sync繼承自 AQS,提供了公平和非公平的實現。

 

重點說明:

AQS 中只維護了一個 state 狀態,而 ReentrantReadWriteLock 須要維護讀狀態和寫狀態,一個 state 怎麼表示寫和讀兩種狀態呢?

ReentrantReadWriteLock 巧妙地使用 state 的高 16 位表示讀狀態,也即獲取到讀鎖的次數;使用低 16 位表示獲取到寫鎖的線程的可重入次數。

寫鎖

寫鎖使用 WriteLock 來實現

獲取鎖

對應代碼以下:

        /**
         * Acquires the write lock.
         *
         * <p>Acquires the write lock if neither the read nor write lock
         * are held by another thread
         * and returns immediately, setting the write lock hold count to
         * one.
         *
         * <p>If the current thread already holds the write lock then the
         * hold count is incremented by one and the method returns
         * immediately.
         *
         * <p>If the lock is held by another thread then the current
         * thread becomes disabled for thread scheduling purposes and
         * lies dormant until the write lock has been acquired, at which
         * time the write lock hold count is set to one.
         */
        public void lock() {
            sync.acquire(1);
        }

解讀:

相似於ReentrantLock的實現,其實是調用了AbstractQueuedSynchronizer的acquire方法,代碼以下:

    /**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

重點關注子類中tryAcquire的實現

 

從上圖可知,tryAcquire方法定義在Sync中,具體代碼以下:

        @ReservedStackAccess
        protected final boolean tryAcquire(int acquires) {
            /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

解讀:

寫鎖是個獨佔鎖,某個時刻只有一個線程能夠獲取該鎖。

若是當前沒有線程獲取到讀鎖和寫鎖,則當前線程能夠獲取到寫鎖而後返回;若是當前己經有線程獲取到讀鎖或者寫鎖,則當前請求寫鎖的線程會被阻塞掛起。

對應上述代碼分支:

(1)AQS狀態值不爲0

  • 若是w==0則說明狀態值的低16位爲0,因爲其高 16位不爲 0,即代表己經有線程獲取到讀鎖,因此直接返回 false。
  • 若是w!=0則說明已經有線程獲取到寫鎖,若當前線程不是該鎖的持有者,則返回 false。

(2)AQS狀態值爲0

說明目前沒有線程獲取到讀鎖和寫鎖,對於 writerShouldBlock 方法,由Sync子類實現,公平鎖和非公平鎖的具體實現不同。

    /**
     * Nonfair version of Sync
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        final boolean writerShouldBlock() {
            return false; // writers can always barge
        }
        final boolean readerShouldBlock() {
            /* As a heuristic to avoid indefinite writer starvation,
             * block if the thread that momentarily appears to be head
             * of queue, if one exists, is a waiting writer.  This is
             * only a probabilistic effect since a new reader will not
             * block if there is a waiting writer behind other enabled
             * readers that have not yet drained from the queue.
             */
            return apparentlyFirstQueuedIsExclusive();
        }
    }

解讀:

對於非公平鎖來講老是返回false,故若是搶佔式執行CAS嘗試獲取寫鎖成功則設置當前鎖的持有者爲當前線程並返回 true,不然返回 false。

    /**
     * Fair version of Sync
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451L;
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }

解讀:

對於公平鎖,其使用 hasQueuedPredecessors來判斷當前線程節點是否有前驅節點,若是有則當前線程放棄獲取寫鎖的權限,直接返回 false。

Note:

寫鎖是可重入鎖,若是當前線程己經獲取了該鎖,再次獲取時則只是簡單地把可重入次數加 1 後直接返回。

釋放鎖

        /**
         * Attempts to release this lock.
         *
         * <p>If the current thread is the holder of this lock then
         * the hold count is decremented. If the hold count is now
         * zero then the lock is released.  If the current thread is
         * not the holder of this lock then {@link
         * IllegalMonitorStateException} is thrown.
         *
         * @throws IllegalMonitorStateException if the current thread does not
         * hold this lock
         */
        public void unlock() {
            sync.release(1);
        }

解讀:

相似於ReentrantLock的實現,其實是調用了AbstractQueuedSynchronizer的release方法,而子類Sync只須要實現tryRelease方法便可,對應方法以下:

        /*
         * Note that tryRelease and tryAcquire can be called by
         * Conditions. So it is possible that their arguments contain
         * both read and write holds that are all released during a
         * condition wait and re-established in tryAcquire.
         */
        @ReservedStackAccess
        protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }

解讀:

若是當前線程持有鎖,調用unlock方法會讓持有的 AQS 狀態值減 1,若減 1 後當前狀態值爲 0,則當前線程會釋放鎖,不然僅僅執行減 1。

若是當前線程沒有持有鎖,則調用該方法會拋出 IllegalMonitorStateException異常。

 

讀鎖

讀鎖是使用ReadLock來實現的。

獲取鎖

對應代碼以下:

        /**
         * Acquires the read lock.
         *
         * <p>Acquires the read lock if the write lock is not held by
         * another thread and returns immediately.
         *
         * <p>If the write lock is held by another thread then
         * the current thread becomes disabled for thread scheduling
         * purposes and lies dormant until the read lock has been acquired.
         */
        public void lock() {
            sync.acquireShared(1);
        }

解讀:

相似於ReentrantLock的實現,其實是調用了AbstractQueuedSynchronizer的acquireShared方法,代碼以下:

    /**
     * Acquires in shared mode, ignoring interrupts.  Implemented by
     * first invoking at least once {@link #tryAcquireShared},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquireShared} until success.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquireShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     */
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

重點關注子類中tryAcquireShared的實現,即查看Sync中tryAcquireShared方法:

        @ReservedStackAccess
        protected final int tryAcquireShared(int unused) {
            /*
             * Walkthrough:
             * 1. If write lock held by another thread, fail.
             * 2. Otherwise, this thread is eligible for
             *    lock wrt state, so ask if it should block
             *    because of queue policy. If not, try
             *    to grant by CASing state and updating count.
             *    Note that step does not check for reentrant
             *    acquires, which is postponed to full version
             *    to avoid having to check hold count in
             *    the more typical non-reentrant case.
             * 3. If step 2 fails either because thread
             *    apparently not eligible or CAS fails or count
             *    saturated, chain to version with full retry loop.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null ||
                        rh.tid != LockSupport.getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

解讀:

若是當前沒有其餘線程持有寫鎖,則當前線程能夠獲取讀鎖,AQS 的狀態值 state的高16位的值會增長 1,而後方法返回;

若是某個線程持有寫鎖,則當前線程會被阻塞。

對應上述代碼分支:

(1)判斷寫鎖是否被佔用,若是是則直接返回 -1,然後調用 AQS 的 doAcquireShared方法把當前線程放入 AQS 阻塞隊列。

(2)變量 r 是獲取到讀鎖的個數

(3)readerShouldBlock方法,由Sync子類實現,公平鎖和非公平鎖的具體實現不同;此處不展開描述,能夠查看本文前面NonfairSync 和 FairSync的實現代碼。

Note:

若是當前要獲取讀鎖的線程己經持有了寫鎖,則也能夠獲取讀鎖;當其處理完事情,須要把讀鎖和寫鎖都釋放掉,不能只釋放寫鎖。

釋放鎖

對應代碼以下:

        /**
         * Attempts to release this lock.
         *
         * <p>If the number of readers is now zero then the lock
         * is made available for write lock attempts. If the current
         * thread does not hold this lock then {@link
         * IllegalMonitorStateException} is thrown.
         *
         * @throws IllegalMonitorStateException if the current thread
         * does not hold this lock
         */
        public void unlock() {
            sync.releaseShared(1);
        }

解讀:

相似於ReentrantLock的實現,其實是調用了AbstractQueuedSynchronizer的releaseShared方法

    /**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

子類Sync只須要實現tryReleaseShared方法便可,對應方法以下:

        @ReservedStackAccess
        protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null ||
                    rh.tid != LockSupport.getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }

解讀:

在無限循環裏面,首先獲取當前 AQS 狀態值並將其保存到變量 c,而後變量 c 減去一個讀計數單位後使用 CAS 操做更新 AQS 狀態值。

若是更新成功則查看當前 AQS 狀態值

  • 當前AQS 狀態值是否爲 0,爲 0 則說明當前己經沒有讀線程佔用讀鎖,則 tryReleaseShared 返回 true(隨後會調用 doReleaseShared 方法釋放一個因爲獲取寫鎖而被阻塞的線程)。
  • 當前 AQS 狀態值不爲 0,則說明當前還有其餘線程持有了讀鎖,則 trγReleaseShared 返回 false。

Note:

關於無限循環的解釋——若是 CAS 更新 AQS 狀態值失敗,則自旋重試直到成功。

比較分析

 

 

ReentrantReadWriteLock與ReentrantLock的比較:

(1)相同點:都是一種顯式鎖,手動加鎖和解鎖,都很適合高併發場景

(2)不一樣點:ReentrantLock 是獨佔鎖,ReentrantReadWriteLock是對ReentrantLock的複雜擴展,能適合更復雜的業務場景,ReentrantReadWriteLock能夠實現一個方法中讀寫分離的鎖機制。

相關文章
相關標籤/搜索