AQS同步隊列結構分析

同步隊列結構

AQS使用的同步隊列是基於一種CLH鎖算法來實現。

CLH鎖也是一種基於鏈表的可擴展、高性能、公平的自旋鎖,申請線程只在本地變量上自旋,它不斷輪詢前驅的狀態,若是發現前驅釋放了鎖就結束自旋.java

img

同步器中包含了兩個節點類型的引用,一個指向頭節點(head),一個指向尾節點(tail),沒有獲取到鎖的線程,加入到隊列的過程必須保證線程安全,所以同步器提供了一個基於CAS的設置尾節點的方法 CompareAndSetTail(Node expect,Node update),它須要傳遞當前線程認爲的尾節點和當前節點,只有設置成功後,當前節點才能正式與以前的尾節點創建關聯。

image-20210113202222401

同步器隊列遵循 FIFO,首節點是獲取鎖成功的節點,首節點的線程在釋放鎖時,會喚醒後續節點,然後繼節點在成功獲取到鎖後,會把本身設置成首節點,設置首節點是由獲取鎖成功的線程來完成的,因爲只有一個線程能成功獲取到鎖,因此設置首節點不須要 CAS

image-20210113202940764

AQS實現一個線程安全的計數器

自定義互斥鎖

package com.rumenz.task.aqs;


import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class MyLock {

    private static final Sync STATE_HOLDER = new Sync();

    /**
     * 經過Sync內部類來持有同步狀態, 當狀態爲1表示鎖被持有,0表示鎖處於空閒狀態
     */
    private static class Sync extends AbstractQueuedSynchronizer {

        /**
         * 是否被獨佔, 有兩種表示方式
         *  1. 能夠根據狀態,state=1表示鎖被佔用,0表示空閒
         *  2. 能夠根據當前獨佔鎖的線程來判斷,即getExclusiveOwnerThread()!=null 表示被獨佔
         */
        @Override
        protected boolean isHeldExclusively() {
            return getExclusiveOwnerThread() != null;
        }

        /**
         * 嘗試獲取鎖,將狀態從0修改成1,操做成功則將當前線程設置爲當前獨佔鎖的線程
         */
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        /**
         * 釋放鎖,將狀態修改成0
         */
        @Override
        protected boolean tryRelease(int arg) {
            if (getState() == 0) {
                throw new UnsupportedOperationException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

    }

    /**
     * 下面的實現Lock接口須要重寫的方法,基本是就是調用內部內Sync的方法
     */
    public void lock() {
        STATE_HOLDER.acquire(1);
    }

    public void unlock() {
        STATE_HOLDER.release(1);
    }
}

測試案例

package com.rumenz.task.aqs;

import org.omg.Messaging.SYNC_WITH_TRANSPORT;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;


public class LockTest {
    private final static Integer clientTotal=100000;
    private final static Integer threadTotal=200;
    private static Count count=new Count();
    private static Count unSafe=new Count();

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final CountDownLatch countDownLatch=new CountDownLatch(clientTotal);
        final Semaphore semaphore=new Semaphore(threadTotal);

        for (int i = 0; i < clientTotal; i++) {

            executorService.execute(()->{
                try{
                    semaphore.acquire();
                    count.getIncrement();
                    unSafe.getUnSafeIncrement();
                    semaphore.release();

                }catch (Exception e){
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        System.out.println("safe:"+count.getCount());
        System.out.println("unSafe:"+unSafe.getCount());
        executorService.shutdown();



    }


}

class Count{
    private MyLock myLock;
    private volatile int count;

     Count() {
        this.myLock=new MyLock();
    }

     int getCount(){
        return count;
    }
     int getIncrement(){
        myLock.lock();
        count++;
        myLock.unlock();
        return count;
    }
     int getUnSafeIncrement(){
        count++;
        return count;
    }
}

輸出結果

safe:100000
unSafe:99995

關注微信公衆號:【入門小站】,解鎖更多知識點算法

相關文章
相關標籤/搜索