併發編程學習筆記(6)----公平鎖和ReentrantReadWriteLock使用及原理

(一)公平鎖java

  一、什麼是公平鎖?緩存

  公平鎖指的是在某個線程釋放鎖以後,等待的線程獲取鎖的策略是以請求獲取鎖的時間爲標準的,即便先請求獲取鎖的線程先拿到鎖。安全

  二、在java中的實現?併發

  在java的併發包中提供了ReentrantLock提供了重入鎖而且也提供了公平鎖(FairSync)和非公平鎖(NonfairSync)。app

  RenntranLock類的構造方法可傳入一個boolean值做爲標記是不是否用公平鎖,默認是非公平的,非公平鎖咱們與咱們以前學習時實現的可重入鎖原理類似,這裏就再也不詳說,接下來看看公平鎖的源碼:框架

protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

FairSync重寫了AQS中的tryAcquire()方法,跟咱們前面的知識點中的須要什麼要的鎖就重寫tryXX方法吻合。這裏看代碼。oop

能夠看到這裏跟咱們前兩兩章學習到的非公平鎖的實現只多了一個hasQueuePredecessors()方法的判斷,這個方法就是判斷當前線程的前一個線程是否也有資格去獲取鎖,只有沒有資格獲取鎖時,當前的線程才能夠返回true.不然就不能獲取鎖,接下來看看hasQueuePredecessors()方法:post

public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }

這段代碼表示若是當鏈表爲空時,是確定能夠獲取的鎖的,因此h!=t返回false,前面的方法會繼續向下執行,鏈表不爲空時,當頭節點的下一個節點爲空時,沒有能夠執行的線程,不能獲取鎖,當頭節點的下一個節點不爲空時,可是頭節點的下一個節點的線程不等於當前線程,這樣也是不能獲取的,表示當前線程的前面還有先進來的線程在等待,因此不能獲取鎖,這樣實現了一個公平鎖。公平鎖的使用只須要在實例化ReentrantLock的時候傳入true便可。性能

(二)讀寫鎖(ReentrantReadWriteLock)的使用及原理。學習

  一、什麼是讀寫鎖?

  在實際的應用中,讀的操做是遠遠大於寫操做的,而且讀操做是不會產生線程安全問題的,若是咱們給讀和寫的全部線程都加上互斥鎖,那麼在讀的過程當中會影響很大的性能,因此在java中提供了讀寫鎖,讀寫鎖分爲讀鎖和寫鎖,其中讀鎖和讀鎖共享,讀鎖和寫鎖互斥,寫鎖和寫鎖互斥,這也是前面AQS的獨享和共享模式的具體實現。

  二、讀寫鎖的運用方式

  直接上代碼:

package com.wangx.thread.t5; import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class Demo { private Map<String,Object> map = new HashMap<>(); private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private Lock write = readWriteLock.writeLock(); private Lock read = readWriteLock.readLock(); public Object get (String key) { read.lock(); System.out.println(Thread.currentThread().getName() + "讀操做在執行..."); try { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return map.get(key); } finally { read.unlock(); System.out.println(Thread.currentThread().getName() + "讀操做執行完畢..."); } } public void put(String key, Object value) { write.lock(); System.out.println(Thread.currentThread().getName() + "寫操做在執行..."); try { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } map.put(key, value); } finally { write.unlock(); System.out.println(Thread.currentThread().getName() + "寫操做執行完畢..."); } } }

因爲hashMap是線程不安全的,可是又是須要讀寫都進行的,因此使用map的get和put方法能夠很好的模擬線程安全問題和讀寫鎖的使用,運用很簡單,就是在線實例化ReentrantReadWriteLock,在經過該對象分別獲取讀寫鎖,在讀的地方加上讀鎖,寫的地方加上寫鎖。

  三、讀寫鎖源碼實現

  首先獲取看寫鎖源碼:

public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }

ReentrantReadWriteLock的writeLock方法返回writeLock,在構造方法中又實例化了WriterLock,WriteLock是ReentrantReadWriteLock的內部類,實現了Lock接口,保證了鎖的全部方法的功能。再看WriteLock中的lock方法,

public void lock() { sync.acquire(1); }

ReentrantReadWriteLock的內部幫助器,因此這裏調用的其實是外部類的Sync內部類,接着進入該類的acquire()方法:

protected final boolean tryAcquire(int acquires) { /* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire
                setState(c + acquires); return true; } if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }

繼承AQS框架的實現都是經過操做state來進行鎖的獲取和釋放,因此getState()方法就不說了,接下來看看exclusiveCount()方法:

static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

這裏是經過十六位來保存讀鎖和寫鎖的,高8位保存寫鎖,低八位保存讀鎖,exclusiveCount()傳入1時返回值爲自己。表示當前只能有一個線程持有獨享鎖。c等於0時,第一個線程第一次進來,操做和原理與以前的本身實現鎖相似,就不詳細說了,當c不等於0時,表示此時是有線程在執行的,可是w等於0則表示當前執行的是持有共享鎖的縣城,或者當前線程不是重入線程的狀況下,返回false,獲取線程失敗,而且當寫線程個數操過了最大數,也會獲取鎖失敗,不然更改狀態,返回true,獲取重入鎖成功。這就是寫鎖的實現,就是判斷在寫線程的狀況下是否有其餘線程在使用鎖。

  寫線程的unlock也是調用WriteLock的中的unlock,最後調用sync中的tryRelease方法,因此重點來看tryRelease方法:

protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }

當線程釋放時,先判斷,若是不是獨享線程,確定是有異常的,直接拋出異常,這個釋放方法只能是獨享線程來調用,作重線程的減減操做,當持有鎖的線程爲0,切重入鎖爲0時,釋放鎖,更改狀態。完成寫鎖的釋放。

  讀鎖的實現源碼,前面說過,讀鎖就是AQS的共享模式的具體運用,跟寫鎖相似,咱們直接在ReentrantReadWriteLock的內部類的Sync中找到tryAcquireShared()方法:

protected final int tryAcquireShared(int unused) { /* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */ Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }

第一步:判斷是否有線程持有獨享鎖的存在,即exclusiveCount不爲0,而且當前鎖不是獨享鎖,返回-1表示獲取鎖失敗。

第二步:獲取持有共享鎖的線程數量,readerShouldBlock()線程公平的判斷,判斷當前持有共享鎖的線程是否小於最大線程數,是更改狀態。

第三步:當r等於0時,表示當前線程是第一個,則將第一個線程保存,並將第一個線程的重入次數計爲1。若是進來的線程是第一個的線程的重入,則重入次數加1。

第四步:HoldCounter用來保存持有持有共享鎖的線程個數及線程id,當第二個能共享的線程進入時,rh==null時,readHolds獲取到當前線程,readHolds經過ThreadLocal<HoldCounter> 保證線程安全。

第五步:當rh != null 時,且rh.tid != getThreadId(current),即緩存的線程不是當前線程,便是進來的一個新的持共享鎖的線程,則也獲取當前線程的信息存到cachedHoldCounter中。

第六步:以上條件不成立時,表示是一個重入線程進入,當他的重入次數爲0,表示第一次進入,將其添加到readHolds中,並將count++記錄某個線程重入的次數,這樣就能保證存入的共享線程個數和每一個線程重入的次數。全部操做成功,則放回1,表示獲取鎖成功,不然fullTryAcquireShared()對獲取失敗的各類緣由進行處理,最後返回結果。

  tryReleaseShared()方法:

protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free.
                    return nextc == 0; } }

這裏只分析跟獨享鎖不一樣的部分,若是當前線程是第一個進來的共享線程,且重入次數爲1時,釋放該線程的共享鎖,不然重入次數減1,當前線程不等於緩存的線程時,獲取readholds中的線程,獲得獲取到的線程衝入次數,當小於等於1時移除該線程,也就是釋放該線程鎖,小於等於0時拋出異常,不爲1時則重入次數減減,當持有共享鎖的全部線程都移除後,不斷自旋,直到成功釋放鎖,返回狀態nextc是否爲0。表示該鎖是否能夠釋放。這樣就成功地釋放了共享鎖。

(三)這裏有個降級鎖,就是在讀和寫可能在同一個操做中是,咱們須要將寫鎖降級爲讀鎖,以保證線程安全,示例代碼:

package com.wangx.thread.t6; import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class Demo { private Map<String,Object> map = new HashMap<>(); private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private Lock write = readWriteLock.writeLock(); private Lock read = readWriteLock.readLock(); private volatile boolean isUpdate; public void readWrite() { read.lock(); if (isUpdate) { read.unlock(); write.lock(); map.put("xxx", "xxx"); read.lock(); write.unlock(); } Object object = map.get("xxx"); System.out.println(object); read.unlock(); } }

以上就是讀寫鎖和公平鎖我所瞭解的知識,今天的分享就到此爲止,不足之處,忘各位指出。

相關文章
相關標籤/搜索