Java高級-線程同步lock與unlock使用

1、Lock與Synchronized區別java

Java中可使用Lock和Synchronized的能夠實現對某個共享資源的同步,同時也能夠實現對某些過程的原子性操做。算法

Lock可使用Condition進行線程之間的調度,Synchronized則使用Object對象自己的notify, wait, notityAll調度機制,這兩種調度機制有什麼異同呢?編程

Condition是Java5之後出現的機制,它有更好的靈活性,並且在一個對象裏面能夠有多個Condition(即對象監視器),則線程能夠註冊在不一樣的Condition,從而能夠有選擇性的調度線程,更加靈活。 api

Synchronized就至關於整個對象只有一個單一的Condition(即該對象自己)全部的線程都註冊在它身上,線程調度的時候以後調度全部得註冊線程,沒有選擇權,會出現至關大的問題 。多線程

因此,Lock 實現提供了比使用 synchronized 方法和語句可得到的更普遍的鎖定操做。此實現容許更靈活的結構,能夠具備差異很大的屬性,能夠支持多個相關的 Condition 對象。 併發

鎖是控制多個線程對共享資源進行訪問的工具。一般,鎖提供了對共享資源的獨佔訪問。一次只能有一個線程得到鎖,對共享資源的全部訪問都須要首先得到鎖。不過,某些鎖可能容許對共享資源併發訪問,如 ReadWriteLock 的讀取鎖。dom

synchronized 方法或語句的使用提供了對與每一個對象相關的隱式監視器鎖的訪問,但卻強制全部鎖獲取和釋放均要出如今一個塊結構中:當獲取了多個鎖時,它們必須以相反的順序釋放,且必須在與全部鎖被獲取時相同的詞法範圍內釋放全部鎖。 ide

雖然 synchronized 方法和語句的範圍機制使得使用監視器鎖編程方便了不少,並且還幫助避免了不少涉及到鎖的常見編程錯誤,但有時也須要以更爲靈活的方式使用鎖。例如,某些遍歷併發訪問的數據結果的算法要求使用 "hand-over-hand" 或 "chain locking":獲取節點 A 的鎖,而後再獲取節點 B 的鎖,而後釋放 A 並獲取 C,而後釋放 B 並獲取 D,依此類推。Lock 接口的實現容許鎖在不一樣的做用範圍內獲取和釋放,並容許以任何順序獲取和釋放多個鎖,從而支持使用這種技術。 工具

2、java.util.concurrent.locks類結構ui

locks

上圖中,LOCK的實現類其實都是構建在AbstractQueuedSynchronizer上,爲什麼圖中沒有用UML線表示呢,這是每一個Lock實現類都持有本身內部類Sync的實例,而這個Sync就是繼承AbstractQueuedSynchronizer(AQS)。爲什麼要實現不一樣的Sync呢?這和每種Lock用途相關。另外還有AQS的State機制。

基於AQS構建的Synchronizer包括ReentrantLock,Semaphore,CountDownLatch, ReetrantRead WriteLock,FutureTask等,這些Synchronizer實際上最基本的東西就是原子狀態的獲取和釋放,只是條件不同而已。

ReentrantLock:須要記錄當前線程獲取原子狀態的次數,若是次數爲零,那麼就說明這個線程放棄了鎖(也有可能其餘線程佔據着鎖從而須要等待),若是次數大於1,也就是得到了重進入的效果,而其餘線程只能被park住,直到這個線程重進入鎖次數變成0而釋放原子狀態。如下爲ReetranLock的FairSync的tryAcquire實現代碼解析:

image

Semaphore:則是要記錄當前還有多少次許可可使用,到0,就須要等待,也就實現併發量的控制,Semaphore一開始設置許可數爲1,實際上就是一把互斥鎖。如下爲Semaphore的FairSync實現:

image

CountDownLatch:閉鎖則要保持其狀態,在這個狀態到達終止態以前,全部線程都會被park住,閉鎖能夠設定初始值,這個值的含義就是這個閉鎖須要被countDown()幾回,由於每次CountDown是sync.releaseShared(1),而一開始初始值爲10的話,那麼這個閉鎖須要被countDown()十次,纔可以將這個初始值減到0,從而釋放原子狀態,讓等待的全部線程經過。

image

FutureTask:須要記錄任務的執行狀態,當調用其實例的get方法時,內部類Sync會去調用AQS的acquireSharedInterruptibly()方法,而這個方法會反向調用Sync實現的tryAcquireShared()方法,即讓具體實現類決定是否讓當前線程繼續仍是park,而FutureTask的tryAcquireShared方法所作的惟一事情就是檢查狀態,若是是RUNNING狀態那麼讓當前線程park。而跑任務的線程會在任務結束時調用FutureTask 實例的set方法(與等待線程持相同的實例),設定執行結果,而且經過unpark喚醒正在等待的線程,返回結果。

image

以上4個AQS的使用是比較典型,然而有個問題就是這些狀態存在哪裏呢?而且是能夠計數的。從以上4個example,咱們能夠很快獲得答案,AQS提供給了子類一個int state屬性。而且暴露給子類getState()和setState()兩個方法(protected)。這樣就爲上述狀態解決了存儲問題,RetrantLock能夠將這個state用於存儲當前線程的重進入次數,Semaphore能夠用這個state存儲許可數,CountDownLatch則能夠存儲須要被countDown的次數,而Future則能夠存儲當前任務的執行狀態(RUNING,RAN,CANCELL)。其餘的Synchronizer存儲他們的一些狀態。

AQS留給實現者的方法主要有5個方法,其中tryAcquire,tryRelease和isHeldExclusively三個方法爲須要獨佔形式獲取的synchronizer實現的,好比線程獨佔ReetranLock的Sync,而tryAcquireShared和tryReleasedShared爲須要共享形式獲取的synchronizer實現。
ReentrantLock內部Sync類實現的是tryAcquire,tryRelease, isHeldExclusively三個方法(由於獲取鎖的公平性問題,tryAcquire由繼承該Sync類的內部類FairSync和NonfairSync實現)Semaphore內部類Sync則實現了tryAcquireShared和tryReleasedShared(與CountDownLatch類似,由於公平性問題,tryAcquireShared由其內部類FairSync和NonfairSync實現)。CountDownLatch內部類Sync實現了tryAcquireShared和tryReleasedShared。FutureTask內部類Sync也實現了tryAcquireShared和tryReleasedShared。

其實使用過一些JAVA synchronizer的以後,而後結合代碼,可以很快理解其究竟是如何作到各自的特性的,在把握了基本特性,即獲取原子狀態和釋放原子狀態,其實咱們本身也能夠構造synchronizer。以下是一個LOCK API的一個例子,實現了一個先入先出的互斥鎖。

image

3、lock與unlock使用

下面是一個場景,針對這個場景提出兩種解決方案。 一箇中轉站,能夠接納貨物,而後發出貨物,這是須要建一個倉庫,至關於一個緩衝區,當倉庫滿的時候,不能接貨,倉庫空的時候,不能發貨。

第一種,用一個Condition去解決,有可能會出問題。

package com.zxx;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* 單個Condition去控制一個緩衝區,多線程對緩衝區作讀寫操做,要保證緩衝區滿的時侯不會
* 被寫,空的時候不會被讀;單個Condition控制會出錯誤: 當緩衝區還有一個位置時,多個寫線程
* 同時訪問,則只有一個寫線程能夠對其進行寫操做,操做完以後,喚醒在這個condition上等待的
* 其餘幾個寫線程,若是判斷用IF語句的話就會出現繼續向緩衝區添加。
* @author Administrator
*
*/
public class ConditionError {
    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();
    String[] container = new String[10];
    int index = 0;
    public static void main(String[] args) {
        ConditionError conditionError = new ConditionError();
        conditionError.test();
    }
    public void test(){
        ExecutorService threadPool = Executors.newCachedThreadPool();
        for(int i = 0; i < 14; i++){//先用14個線程去寫,則有4個線程會被阻塞
            threadPool.execute(new Runnable(){

                @Override
                public void run() {
                    put();
                }
            });
        }
        Executors.newSingleThreadExecutor().execute(new Runnable(){//用一個線程去取,則會通知4個阻塞的寫線程工做,此時
                                                                //會有一個線程向緩衝區寫,寫完後去通知在這個condition上等待
                                                                //的取線程,這是它的本意,可是它喚醒了寫線程,由於只有一個condition
                                                                //不能有選擇的喚醒寫取線程,此時就須要有多個Condition
            @Override
            public void run() {
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                get();
            }
        });
    }
    /**
     * 向緩衝去寫數據
     */
    public void put(){
        lock.lock();
        try{
            System.out.println(Thread.currentThread().getName() + "當前位置:" + index + "-----------------------------");
            while(index == 10){
                try {
                    System.out.println(Thread.currentThread().getName() + "處於阻塞狀態!");
                    condition.await();
//                    index = 0;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            container[index] = new String(new Random().nextInt() + "");
            condition.signalAll();
            index ++;
        } finally {
            lock.unlock();
        }
    }
    /**
     * 從緩衝區拿數據
     */
    public void get(){
        lock.lock();
        try{
            while(index == 0){
                try {
                    System.out.println("get--------" + Thread.currentThread().getName() + "處於阻塞");
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            index --;
            System.out.println("get---------" + Thread.currentThread().getName() + "喚醒阻塞");
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }
}

第二種解決方案,用java api中的 一個例子 。

image

相關文章
相關標籤/搜索