java併發編程(7)構建自定義同步工具及條件隊列

構建自定義同步工具

1、經過輪詢與休眠的方式實現簡單的有界緩存

public void put(V v) throws InterruptedException {
        while (true) {                          //輪詢
            synchronized (this) {               //加鎖等待
                if (!isFull()) {                //若是緩存沒滿 則執行插入
                    doPut(v);
                    return;
                }
            }
            Thread.sleep(SLEEP_GRANULARITY);    //若是緩存滿了 線程等待一段時間後繼續輪詢
        }
    }

    public V take() throws InterruptedException {//同理 相反
        while (true) {
            synchronized (this) {
                if (!isEmpty())
                    return doTake();
            }
            Thread.sleep(SLEEP_GRANULARITY);
        }
    }

 

2、經過條件隊列:wait notify方法

   注意:實際邏輯與上面沒區別;且沒法經過輪詢和休眠方式實現的,也沒法經過條件隊列實現緩存

   

    // BLOCKS-UNTIL: not-full
    public synchronized void put(V v) throws InterruptedException {
        while (isFull()) {
            wait();             //等待:釋放鎖且等待;等待通知後獲取鎖 並繼續判斷條件謂詞
        }
        doPut(v);
        notifyAll();            //通知:通知釋放全部等待
    }
 
    // 問題,很差分析,在put 和 take操做後,通知了全部等待條件隊列
    // 並且讓全部 等待都要去判斷 條件謂詞;
    // 如我put完成,只須要通知take正在等待的,而無需通知put正在等待的(反之亦然),由於只有take後,緩存纔有可能不是滿的,纔要去判斷條件謂詞
    public synchronized V take() throws InterruptedException {
        while (isEmpty()) {
            wait();
        }
        V v = doTake();
        notifyAll();
        return v;
    }

 

 

 3、內置條件隊列的使用

    條件謂詞:對象在哪一個條件下等待 工具

  條件隊列:每次喚醒時,必須從新檢查條件謂詞優化

 

 4、顯式鎖Condition:自定義條件隊列

  內置條件隊列:每一個內置鎖只能有一個相關聯的條件隊列ui

    因此:多個線程在一個條件隊列上等待不一樣的條件謂詞很差解決this

    方法:wait、notify、notifyAllspa

  而Condition:每一個Lock,能夠有任意的Condition線程

    方法:await、signal、signalAllcode

優化內置的條件隊列:對象

public class ConditionBoundedBuffer <T> {
    protected final Lock lock = new ReentrantLock();
    // 條件謂詞 未滿
    private final Condition notFull = lock.newCondition();
    // 條件謂詞 非空
    private final Condition notEmpty = lock.newCondition();
    private static final int BUFFER_SIZE = 100;
    private final T[] items = (T[]) new Object[BUFFER_SIZE];
    private int tail, head, count;

    // BLOCKS-UNTIL: notFull
    public void put(T x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await();    //當緩存滿了,等待有線程取出緩存;等待未滿謂詞通知(只有take纔有可能讓緩存不滿)
            items[tail] = x;
            if (++tail == items.length)
                tail = 0;
            ++count;
            notEmpty.signal();      //當插入成功,通知釋放非空謂詞
        } finally {
            lock.unlock();
        }
    }

    // BLOCKS-UNTIL: notEmpty
    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await();   //當緩存爲空,等待插入;等待非空謂詞通知
            T x = items[head];
            items[head] = null;
            if (++head == items.length)
                head = 0;
            --count;
            notFull.signal();               //當取出一個,通知釋放未滿謂詞
            return x;
        } finally {
            lock.unlock();
        }
    }
}

 

 

5、AQS:AbstractQueuedSynchronizer

  先來看一個例子:blog

//經過 lock+條件隊列實現信號量Semaphore 許可
public class SemaphoreOnLock {
    private final Lock lock = new ReentrantLock();
    // CONDITION PREDICATE: permitsAvailable (permits > 0)
    private final Condition permitsAvailable = lock.newCondition();
    private int permits;

    SemaphoreOnLock(int initialPermits) {
        lock.lock();
        try {
            permits = initialPermits;
        } finally {
            lock.unlock();
        }
    }

    // BLOCKS-UNTIL: permitsAvailable
    public void acquire() throws InterruptedException {
        lock.lock();
        try {
            while (permits <= 0)            //當許可<0時 等待 許可釋放
                permitsAvailable.await();
            --permits;
        } finally {
            lock.unlock();
        }
    }

    public void release() {
        lock.lock();
        try {
            ++permits;                      //釋放許可
            permitsAvailable.signal();
        } finally {
            lock.unlock();
        }
    }
}

 

  AQS負責管理同步器類中的狀態,它管理了一個整數狀態信息;

  如:經過AQS實現簡單的閉鎖

public class OneShotLatch {
    private final Sync sync = new Sync();

    public void signal() {
        sync.releaseShared(0);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(0); 
    }

    private class Sync extends AbstractQueuedSynchronizer { //繼承AQS
        protected int tryAcquireShared(int ignored) {
            // 判斷狀態
            return (getState() == 1) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int ignored) {
            //釋放狀態
            setState(1); // 打開閉鎖
            return true; // 其餘線程能夠獲取該閉鎖

        }
    }
}

 

小結

  1.最好使用現有的類庫來構建狀態類:如緩存

  2.若是現有類不能知足功能:如從一個空隊列中刪除或獲取元素會拋出異常,固然也有阻塞等待

    那麼使用Lock、內置的條件隊列、Condition顯式的條件隊列、AQS等來構建本身的同步器也是可行的

    內置條件隊列與內置鎖關聯、顯式的條件隊列與Lock關聯

相關文章
相關標籤/搜索