AQS使用的同步隊列是基於一種CLH鎖算法來實現。CLH鎖也是一種基於鏈表的可擴展、高性能、公平的自旋鎖,申請線程只在本地變量上自旋,它不斷輪詢前驅的狀態,若是發現前驅釋放了鎖就結束自旋.java
同步器中包含了兩個節點類型的引用,一個指向頭節點(head),一個指向尾節點(tail),沒有獲取到鎖的線程,加入到隊列的過程必須保證線程安全,所以同步器提供了一個基於CAS的設置尾節點的方法
CompareAndSetTail(Node expect,Node update)
,它須要傳遞當前線程認爲的尾節點和當前節點,只有設置成功後,當前節點才能正式與以前的尾節點創建關聯。
同步器隊列遵循FIFO
,首節點是獲取鎖成功的節點,首節點的線程在釋放鎖時,會喚醒後續節點,然後繼節點在成功獲取到鎖後,會把本身設置成首節點,設置首節點是由獲取鎖成功的線程來完成的,因爲只有一個線程能成功獲取到鎖,因此設置首節點不須要CAS
。
package com.rumenz.task.aqs; import java.util.concurrent.locks.AbstractQueuedSynchronizer; public class MyLock { private static final Sync STATE_HOLDER = new Sync(); /** * 經過Sync內部類來持有同步狀態, 當狀態爲1表示鎖被持有,0表示鎖處於空閒狀態 */ private static class Sync extends AbstractQueuedSynchronizer { /** * 是否被獨佔, 有兩種表示方式 * 1. 能夠根據狀態,state=1表示鎖被佔用,0表示空閒 * 2. 能夠根據當前獨佔鎖的線程來判斷,即getExclusiveOwnerThread()!=null 表示被獨佔 */ @Override protected boolean isHeldExclusively() { return getExclusiveOwnerThread() != null; } /** * 嘗試獲取鎖,將狀態從0修改成1,操做成功則將當前線程設置爲當前獨佔鎖的線程 */ @Override protected boolean tryAcquire(int arg) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } /** * 釋放鎖,將狀態修改成0 */ @Override protected boolean tryRelease(int arg) { if (getState() == 0) { throw new UnsupportedOperationException(); } setExclusiveOwnerThread(null); setState(0); return true; } } /** * 下面的實現Lock接口須要重寫的方法,基本是就是調用內部內Sync的方法 */ public void lock() { STATE_HOLDER.acquire(1); } public void unlock() { STATE_HOLDER.release(1); } }
package com.rumenz.task.aqs; import org.omg.Messaging.SYNC_WITH_TRANSPORT; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class LockTest { private final static Integer clientTotal=100000; private final static Integer threadTotal=200; private static Count count=new Count(); private static Count unSafe=new Count(); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch=new CountDownLatch(clientTotal); final Semaphore semaphore=new Semaphore(threadTotal); for (int i = 0; i < clientTotal; i++) { executorService.execute(()->{ try{ semaphore.acquire(); count.getIncrement(); unSafe.getUnSafeIncrement(); semaphore.release(); }catch (Exception e){ e.printStackTrace(); } countDownLatch.countDown(); }); } countDownLatch.await(); System.out.println("safe:"+count.getCount()); System.out.println("unSafe:"+unSafe.getCount()); executorService.shutdown(); } } class Count{ private MyLock myLock; private volatile int count; Count() { this.myLock=new MyLock(); } int getCount(){ return count; } int getIncrement(){ myLock.lock(); count++; myLock.unlock(); return count; } int getUnSafeIncrement(){ count++; return count; } }
safe:100000 unSafe:99995
關注微信公衆號:【入門小站】,解鎖更多知識點算法