併發編程之:併發控制

1.內存模型

在Java中每一個線程有一塊工做內存區,其中存放着被全部線程共享主內存中數據值的拷貝。當線程執行時,它在本身內存中操做這些變量。java

使用(use)、賦值(assign)、鎖定(lock)和解鎖(unlock)操做都是線程執行引擎和線程工做內存的原子操做。但主內存和線程工做內存的數據交換並不知足原子性。多線程

當數據從主內存複製到線程工做內存時有兩個動做:第一由主內存執行的讀(read)操做,第二由工做內存執行load操做。併發

當數據從工做內存拷貝到主內存時也有兩個動做:第一由工做內存執行的存儲(store)操做,第二由主內存執行的寫操做。ide

各個操做含義函數

use:把一個變量在線程工做內存中的拷貝內容傳送給線程執行引擎。高併發

assign:把一個變量從線程執行引擎傳送到變量的線程工做內存。性能

read:把一個變量的主內存拷貝的內容傳輸到線程工做內存,以便load操做使用。測試

load:把read操做從主內存中獲得的值放入到線程的工做內存中。優化

store:把一個變量的線程工做內存拷貝內容傳送到主內存中,以便write操做使用。ui

write:把store操做從線程工做內存中獲得的值放入到主內存的變量拷貝中。

lock:使線程得到一個獨佔鎖。

unlock:釋放一個線程的獨佔鎖。

double和long類型變量的非原子處理:若是一個double或者long變量沒有聲明爲volatile,則變量在進行read或write操做時,主內存把他當作兩個32位的read或write操做進行處理,着兩個操做在時間上是分開的,可能會有其餘操做穿插其中。若是這種狀況方法,則兩個併發的線程對共享的非volatile類型的double或long變量賦不一樣的值,那麼隨後對該變量的使用而獲取的值可能不能等於任何一個線程所賦的值。所以在32位系統中,必須對double或long進行同步。

2.volatile變量使用

能夠作以下保證:

(1)其餘線程對變量的修改,能夠及時反應到當前線程中。

(2)確保當前線程對volatile變量的修改,能及時寫回共享主內存中,並被其餘線程所見。

(3)使用volatile聲明的變量,編譯器會保證其有序性。

注意:使用volatile標識的變量,將迫使全部線程均讀寫主內存中對應的變量,從而使得volatile變量在多線程中可見。

public class VolatileTest extends Thread{
        private volatile boolean isStop = false;
        public void stopMe(){
            isStop = true;
        }

        public void run(){
            int i = 0;
            while (!isStop){
                i++;
            }
            System.out.println("stop thread !");
        }

    public static void main(String[] args) throws InterruptedException {
        VolatileTest m = new VolatileTest();
        m.start();
        System.out.println("啓動線程");
        Thread.sleep(2000L);
        m.stopMe();
        System.out.println("關閉線程");
        Thread.sleep(2000L);
    }
}

單例模式使用volatile

public class Singleton {
    public static volatile Singleton singleton;

    /**
     * 構造函數私有,禁止外部實例化
     */
    private Singleton() {};

    public static Singleton getInstance() {
        if (singleton == null) {
            synchronized (singleton) {
                if (singleton == null) {
                    singleton = new Singleton();
                }
            }
        }
        return singleton;
    }
}

單例模式使用volatile的必要性,要理解這個問題首先要了解實例化一個對象步驟:

(1)分配內存空間

(2)初始化對象

(3)內存空間地址賦值給對應的引用。

可是因爲操做系統能夠對指令進行重排序,因此上面的過程也可能會變成以下過程:

(1)分配內存空間。

(2)將內存空間的地址賦值給對應的引用。

(3)初始化對象

若是是這個流程,多線程環境下就可能將一個未初始化的對象引用暴露出來,從而致使不可預料的結果。所以,爲了防止這個過程的重排序,咱們須要將變量設置爲volatile類型的變量。

3.同步關鍵字synchronized

在早期版本中synchronized性能並不算太好,可是隨着JVM虛擬機不停的改進並優化synchronized,在JDK6中,synchronized和非公平鎖的差距已經縮小,且從長遠來看synchronized的性能還將會作進一步優化。而且synchronized的使用也比其餘同步方式相比更爲的簡潔明瞭。

只是使用synchronized還不足以控制複雜邏輯的線程交換,還要配合Object對象的wait和notify方法。

wait方法可讓當前線程進入等待狀態,在wait過程當中線程會釋放對象鎖。當調用該對象的notify方法時,在該對象上面等待的線程會從新得到對象鎖繼續往下執行。當有多個線程在該對象上等待時,notify會隨機選取其中一個。

下面就是一個阻塞隊列的例子:

//阻塞隊列來進行測試
class BlockQueue{

    private List list = new ArrayList();

    public synchronized Object pop() throws InterruptedException{
        while (list.size() == 0){ //若是隊列爲空,則等待
            System.out.println(Thread.currentThread().getName()+":進來等待!");
            wait();
            System.out.println(Thread.currentThread().getName()+":等待結束!");
        }
        if(list.size()>0){
            System.out.println(Thread.currentThread().getName()+":取值成功!");
            return list.remove(0);
        }else{
            return null;
        }

    }

    public synchronized void put(Object o){
        list.add(o);
        System.out.println("放入數據!");
        notify();//通知等待的線程能夠取數據
    }

}
class PopWorker implements Callable<Object>{

    BlockQueue queue;

    PopWorker(BlockQueue queue){
        this.queue = queue;
    }

    @Override
    public Object call() throws Exception {
        System.out.println(Thread.currentThread().getName()+":開始取值!");
        return queue.pop();
    }
}

開多個線程去一個阻塞隊列中取值

public static void main(String[] args) throws InterruptedException {
    BlockQueue queue = new BlockQueue();
    //開多個線程去取值
    PopWorker worker1 = new PopWorker(queue);
    PopWorker worker2 = new PopWorker(queue);
    PopWorker worker3 = new PopWorker(queue);
    ExecutorService service = Executors.newCachedThreadPool();
    service.submit(worker1);
    service.submit(worker2);
    service.submit(worker3);

    //保證三個線程都進入wait狀態
    Thread.sleep(2000);
    queue.put("1");
}

執行結果:只有線程「pool-1-thread-1」被喚醒取到值,其餘兩個線程都繼續等待狀態。

pool-1-thread-1:開始取值!
pool-1-thread-2:開始取值!
pool-1-thread-3:開始取值!
pool-1-thread-1:進來等待!
pool-1-thread-3:進來等待!
pool-1-thread-2:進來等待!
放入數據!
pool-1-thread-1:等待結束!
pool-1-thread-1:取值成功!

4. 重入鎖ReentrantLock

JDK5中在高併發狀況下比synchronized有明顯的性能優點,在JDK6中因爲JVM的優化,性能相差不大。

ReentrantLock提供了公平鎖和非公平鎖兩種方式。公平鎖能夠保證在鎖的等待隊列中各線程是公平的,所以不會出現插隊狀況,對鎖的獲取老是先進先出,而非公平鎖不作這個保證。

公平鎖的實現代價比非公平鎖大,所以在性能上分析,非公平鎖性能要好的多。能夠經過ReentrantLock的構造方法指定生產公平鎖仍是非公平鎖:

public ReentrantLock(boolean fair)

主要方法:

lock.lock();//得到鎖,若是已經被佔用則等待(在等待中不能被中斷)
lock.tryLock();//嘗試得到鎖,若是成功返回TRUE,不等待
lock.lockInterruptibly();//得到鎖,但優先響應中斷(在等待中能夠被中斷)
lock.unlock();//釋放鎖

例如:

class Worker implements Runnable{
    ReentrantLock lock;
    String name;
    public Worker(String name,ReentrantLock lock){
        this.name = name;
        this.lock = lock;
    }
    @Override
    public void run() {
       try {
           System.out.println(name+":準備獲取鎖");
           lock.lock();
           System.out.println(name+":獲取鎖");
           Thread.sleep(10000);
       }catch (Exception e){
           e.printStackTrace();
       }finally {
           System.out.println(name+":釋放鎖");
           lock.unlock();
       }
    }
}

main方法

ReentrantLock lock = new ReentrantLock();

Worker w1 = new Worker("w1",lock);
Worker w2 = new Worker("w2",lock);
new Thread(w1).start();
new Thread(w2).start();

5. ReadWriteLock讀寫鎖

ReadWriteLock是jdk5裏面提供的讀寫分離鎖。讀寫鎖容許多個線程同時對資源進行讀操做,寫寫操做或讀寫操做則依然須要互相等待。(這裏讀鎖存在的意義就是在資源進行寫鎖控制時不容許讀,讀時不容許寫)。

下面的例子

先分別開兩個寫鎖:證實兩個寫鎖是互斥的。

再開三個讀鎖:證實在寫鎖沒有釋放時讀鎖不能得到,寫鎖釋放時三個線程能夠同時獲取讀鎖。

1)定義讀寫資源

class Handler{
    private  Lock readLock ;
    private  Lock writeLock ;
    private Map<String,String> map = new HashMap<>();

    public Handler(ReentrantReadWriteLock readWriteLock){
        readLock = readWriteLock.readLock();
        writeLock = readWriteLock.writeLock();
    }

    public String read(){
        try{
            System.out.println(Thread.currentThread().getName()+":開始獲取讀鎖");
            readLock.lock();
            System.out.println(Thread.currentThread().getName()+":獲取讀鎖");
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return map.get("java");
        }finally {
            readLock.unlock();
            System.out.println(Thread.currentThread().getName()+":釋放讀鎖");
        }
    }

    public void write(String value){
        try {
            System.out.println(Thread.currentThread().getName()+":開始獲取寫鎖");
            writeLock.lock();
            System.out.println(Thread.currentThread().getName()+":獲取寫鎖");
            map.put("java",value);
            Thread.sleep(30000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            writeLock.unlock();
            System.out.println(Thread.currentThread().getName()+":釋放寫鎖");
        }
    }
}

2)分別定義讀和寫操做

class Reader implements Runnable{

    private Handler handler;

    public Reader(Handler handler){
        this.handler = handler;
    }

    @Override
    public void run() {
        handler.read();
    }
}

class Writer implements Runnable
{
    private Handler handler;

    public Writer(Handler handler){
        this.handler = handler;
    }
    @Override
    public void run() {
        handler.write("hello world !");
    }
}

主方法調用

public static void main(String[] args) throws InterruptedException {
    ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    Handler handler = new Handler(readWriteLock);

    Writer writer1 = new Writer(handler);
    Writer writer2 = new Writer(handler);

    Reader reader1 = new Reader(handler);
    Reader reader2 = new Reader(handler);
    Reader reader3 = new Reader(handler);

    ExecutorService service = Executors.newCachedThreadPool();
    service.execute(writer1);
    service.execute(writer2);
    Thread.sleep(20000) ;//先開兩個寫進程分別佔用寫鎖30秒,20秒以後開三個讀鎖
    service.execute(reader1);
    service.execute(reader2);
    service.execute(reader3);
    service.shutdown();
}

後面有先用讀佔用鎖,寫線程去獲取寫鎖,發現讀鎖被佔用時寫鎖也是不能得到的,因此讀寫鎖是互斥的。讀的時候不能獲取寫,寫的時候不能獲取讀。

6. Condition對象

線程間的協做光有鎖是不夠的,Condition能夠用於協做線程間複雜的操做。Condition對象是與lock綁定的因此就有Lock的公平性特性:若是是公平鎖,線程爲按照FIFO的順序從Condition.await中釋放,若是是非公平鎖,那麼後續的鎖競爭就不保證FIFO順序了。

Condition就至關是在使用synchronized時咱們須要使用Object的wait/notify/notifyAll這樣的方法來控制更爲複雜的業務邏輯。

Condition方法:

await*( ) 線程釋放鎖進入等待狀態。

singal*( )線程被喚醒獲取鎖繼續執行。

這裏咱們用一個例子來講明這個問題,用一個線程不停往裏面放入數據,另外一個線程不停從裏面取數據,當隊列滿時放數據線程釋放鎖進入等待狀態同時喚醒等待中取數據線程進行取數據;同理當隊列中數據爲空時取數據線程釋放鎖進入等待狀態,同時喚醒放數據線程進行放數據操做。

//構造一個阻塞隊列
class MyBlockList{

    private String[] items ;
    private volatile int count;
    private Lock lock = new ReentrantLock();

    private Condition canPut = lock.newCondition();
    private Condition canTake = lock.newCondition();

    public MyBlockList(int len){
        items = new String[len];//初始化時肯定隊列長度
        count = 0;//元素數量
    }

    public String take(){
        try {
            lock.lockInterruptibly();
            if(count == 0){//若是沒有元素進行等待
                System.out.println("隊列爲空不能拿了");
                canTake.await();
                System.out.println("能夠放被喚醒");
            }
            count--;
            String result = items[count];
            System.out.println("取走一個剩:"+count);
            Thread.sleep(1000);
            canPut.signal();//拿走了一個,發出能夠放信號
            return result;
        } catch (InterruptedException e) {
            canTake.signal();
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return "";
    }

    public void put(String value){
        try {
            lock.lockInterruptibly();
            if(count == items.length){//若是已經滿了則不能放只能等收到能夠放信號
                System.out.println("隊列滿不能放了");
                canPut.await();
                System.out.println("能夠取被喚醒");
            }
            count++;
            items[count-1] = value;
            System.out.println("放入一個:當前有"+count);
            Thread.sleep(1000);
            canTake.signal();//從新放了一個,發出取信號
        } catch (InterruptedException e) {
            e.printStackTrace();
            canPut.signal();
        } finally {
            lock.unlock();
        }
    }
}

在main方法中開兩個線程進行存取數據操做

public static void main(String[] args) {
   final MyBlockList queue = new MyBlockList(5);

    new Thread(new Runnable() {
        @Override
        public void run() {
            while (true){
                queue.take();
            }

        }
    }).start();

    new Thread(new Runnable() {
        @Override
        public void run() {

            while (true){
                queue.put("hello world !");
            }
        }
    }).start();

}

7. Semaphore信號量

信號量對鎖的概念進行了擴展,它能夠限定對某一資源的訪問最大線程數。

舉個例子,就好比說有一個資源只能給5個客戶端訪問,這樣咱們就能夠構造一個准入數爲5的信號量,當一個客戶端進入訪問資源就標記一個信號量,當信號量都被標記完了則說明資源訪問達到最大值,當有客戶端結束訪問,信號量就釋放一個,其餘排隊等待的客戶端就能夠准許進入訪問資源。

public static void main(String[] args) {
   final Semaphore semaphore = new Semaphore(5);

    ExecutorService service = Executors.newCachedThreadPool();

    for (int i=0;i<=20;i++){//總共有20個線程要訪問,只放入5個

        Runnable run = new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName()+"線程進入");
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName()+"線程獲取信號量");
                    Thread.sleep(5000);
                    semaphore.release();
                    System.out.println(Thread.currentThread().getName()+"線程釋放信號量");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        service.execute(run);
    }

    //執行完關閉線程池
    service.shutdown();
}

8. ThreadLock線程局部變量

ThreadLock爲每一個線程提供變量的獨立副本。

從起set方法能夠看到,每一個線程都有一個單獨的map用來存儲該線程放入的數據

public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}
public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    return setInitialValue();
}
public void remove() {
    ThreadLocalMap m = getMap(Thread.currentThread());
    if (m != null)
        m.remove(this);
}

9. 鎖性能和優化

1. 多線程會形成額外的開銷因此多線程的使用並非線程越多越好。

2. 避免死鎖

    形成死鎖必需要知足如下四個條件:互斥條件、請求與保持條件、不剝奪條件、循環等待條件。

3. 減少鎖持有時間

4. 減少鎖粒度(ConcurrentHashMap)

5. 讀寫分離鎖來替換獨佔鎖(ReadWriteLock)

6. 鎖分離(LinkedBlockQueue)

7. 重入鎖(ReentrantLock)和內部鎖(synchronized)

8. 鎖粗化:過於頻繁對鎖的請求、同步、釋放會使系統資源開銷大大增長反而會是系統性能降低

    頻繁對鎖請求:

for(int i=0;i<100;i++){
    synchronized (lock){
        //do sth
    }
}

    鎖粗化以後:

synchronized (lock){
    for (int i=0;i<100;i++){
        //do sth
    }
}

9. 自旋鎖

    由於線程的掛起、恢復是須要較多系統資源的,若是這段時間開銷比鎖等待所須要的時間開銷要大不少,所以在這種狀況下咱們有可能情願進行鎖等待也不肯意去把線程掛起,而後在得到鎖時再恢復。

    所以JVM引入了自選鎖,自旋鎖可使線程在沒有取的鎖時,不被掛起,而轉而去執行空循環(即所謂的自旋)若干個空循環後線程得到鎖,則繼續執行,若是沒有得到鎖纔會被掛起。這樣使得線程被掛起的概率相對減小。(對鎖競爭不激烈,鎖佔用時間比較短的併發操做有積極做用)。

    JVM虛擬機提供-XX:+UseSprinning參數來開啓自選鎖,使用-XX:PreBlockSpin參數設置自旋鎖的等待次數。

10. 消除鎖

    JVM在編譯時進行逃逸分析(對上下文掃描)能夠消除那些沒必要要的鎖。好比在方法內部使用StringBuffer進行字符串操做(該操做就不會產生線程間鎖競爭)。

    能夠經過JVM參數配置,是否進行逃逸分析和鎖消除

    開啓:-server -XX:-DoEscapeAnalysis -XX:-EliminateLocks

    關閉:-server -XX:+DoEscapeAnalysis -XX:+EliminateLocks

11. 鎖偏向

     若是程序沒有競爭,則取消以前已經取得鎖同步操做。當某一鎖被線程獲取以後,便進入偏向模式,當線程再次請求這個鎖時,無需再進行相關的同步操做。

     JVM中能夠經過配置參數開啓或關閉鎖偏向

     開啓:+XX:UseBiasedLocking

     關閉:-XX:UseBiasedLocking

相關文章
相關標籤/搜索