Java多線程整理(li)

目錄:html

1.volatile變量java

2.Java併發編程學習算法

3.CountDownLatch用法編程

4.CyclicBarrier使用api

5.BlockingQueue使用數組

6.任務執行器Executor
7.CompletionService使用
8.ConcurrentHashMap使用
9.Lock使用緩存

 

 

1、 volatile變量安全

  1.volatile原理:volatile的原理其實是告訴處理器,不要把變量緩存在寄存器或者相對於其餘處理器不可見的地方,而是把變量放在主存,每次讀寫操做都在主存上進行操做。另外,被申明爲volatile的變量也不會與其它內存中的變量進行重排序。
服務器

  2.volatile同步:volatile是同步的一個子集,只保證了變量的可見性,可是不具有原子特性。這就是說線程可以自動發現 volatile 變量的最新值。相對於同步而言,volatile的優點:a.簡易性,能夠像使用其餘變量同樣使用volatile變量;b.volatile變量不會形成線程阻塞;c.若是讀操做遠遠大於寫操做,volatile 變量還能夠提供優於鎖的性能優點網絡

  3.正確使用volatile條件:對變量的寫操做不依賴於當前值;該變量沒有包含在具備其餘變量的不變式中;

/*
 * 對於第一條原則:對變量的寫操做不依賴於當前值;
 * 雖然i++只有一條語句,實際上這條語句是分三步執行的,讀入i,i加1,寫入i;
 * 若在第三步執行過程前,其餘線程對i進行了改動,此時的結果將是錯的。所以即便使用了volatile進行控制,並不能保證這個操做是線程安全的。
*/

private volatile int i=1;
... 
i++;

/*
 * 這類問題的解決方案有兩種: 
 * 1.一種是採用synchronized進行同步控制,這顯然違背了volatile的初衷
 * 2.一種是採用CPU原語進行控制。在jdk1.5以後,java.util.concurrent.atomic包下的不少類就是採用這種方式進行控制,這樣能夠在保持性能的狀況下,保證數據的線程安全。
 */

2、Java併發編程學習

  在java 併發編程實踐中對線程安全的定義以下:當多個線程訪問一個類時,若是不用考慮這些線程在運行時環境下的調度和交替運行,而且不須要額外的同步及在調用方代碼沒必要作其餘的協調,這個類的行爲仍然是正確的,那麼這個類就是線程安全的。徹底由線程安全的類構成的程序不必定就是線程安全的。

/*
 * 以下代碼所示: 
 * 雖然Vector是一個線程安全的類,可是對於由Vector線程安全的方法組成上面的邏輯,顯然不是線程安全的。 
 * 固然,由線程不安全的類構成的程序,也不必定不是線程安全的。
 */

Vector v;
 
if(!v.contains(o)){
v.add(o);
}

/*
 * 微妙的可見性:
 * 當變量的讀入寫出被不一樣的線程共享時,必須使用同步。若不使用同步將有可能發生與直覺截然不同的錯誤。
 */
public class TestVisiable {

    static int x = 0, y = 0;
    static int b = 0, a = 0;

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(new Runnable() {

            @Override
            public void run() {
                a = 1;
                x = b;
            }
        });
        Thread t2 = new Thread(new Runnable() {

            @Override
            public void run() {
                b = 1;
                y = a;
            }
        });
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(x + " " + y);
    }
}

/*
 * 因爲沒有同步,程序運行的可能結果爲(1,0) (0,1),(1,1),然而,還有可能爲(0,0),難以想象吧。 
 * 1.這是因爲爲了加快併發執行,JVM內部採用了重排序的機制致使。
 * 2.最低限的安全性在java中,java存儲模型要求java變量的獲取和存儲都是原子操做,可是有兩種類型的變量是比較特殊的,沒有申明爲volitale的long和double變量。 
 * 緣由在於:
 * 1.在java中long和double變量爲64位,jvm容許將64位的讀寫操做劃分爲兩個32位的操做。
 * 2.當多個線程同時讀寫非volatile的long或者double類型數據時,將有可能獲得數據是一個數的高32位和另外一個數的低32位組成的數。
 * 3.這樣的結果顯然是徹底不對的。所以對應long或double類型的數據用於多線程共享時,必須申明加上volatile或者進行同步。
 */

3、CountDownLatch用法

  在jdk API中以下描述:一個同步輔助類,在完成一組正在其餘線程中執行的操做以前,它容許一個或多個線程一直等待。用給定的計數初始化 CountDownLatch。因爲調用了 countDown() 方法,因此在當前計數到達零以前,await 方法會一直受阻塞。以後,會釋放全部等待的線程,await 的全部後續調用都將當即返回。

  CountDownLatch 是一個通用同步工具,它有不少用途。將計數 1 初始化的 CountDownLatch 用做一個簡單的開/關鎖存器,或入口:在經過調用 countDown() 的線程打開入口前,全部調用 await 的線程都一直在入口處等待。

/*
 * 下面給出一個詳細的應用場景 百米賽跑:
 * 比賽共有10名運動員參與,10名運動在起跑線上統一等待號令員發起起跑的號令,經過終點時號令員統計該運動員的時間,當全部運動員都跑到終點時,報告每一個人的成績。
 * 這裏實際上就能夠採用CountDownLatch來解決起跑線上設置一個CountDownLatch,當號令員沒有發起起跑號令時,全部運動員都在起跑線上等待。
 * 在終點設置一個CountDownLatch,起跑後號令員一直等待,當全部運動員都經過終點後,它會報告成績。 用代碼實現以下:
 */

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

public class Commander {

    private final CountDownLatch     startSignal;                            // 起跑信號
    private final CountDownLatch     endSignal;                              // 終點信號
    private Long                     startTime;
    private final Map<Integer, Long> scoreMap = new HashMap<Integer, Long>();

    public void waitStart() {
        try {
            startSignal.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public synchronized void start() {
        startTime = System.currentTimeMillis();
        startSignal.countDown();
        try {
            endSignal.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void reach(int id) {
        endSignal.countDown();
        scoreMap.put(id, System.currentTimeMillis() - startTime);// 統計時間
    }

    public Commander(int num){
        startSignal = new CountDownLatch(1);
        endSignal = new CountDownLatch(num);
    }

    public static void main(String[] args) {
        int runnerNum = 10;
        Commander c = new Commander(runnerNum);
        for (int i = 0; i < runnerNum; i++) {
            new Thread(new Runner(i + 1, c)).start();
        }
        c.start(); // 發起號令
        for (Integer i : c.scoreMap.keySet()) {
            System.out.println(i + "號運動員,耗時" + c.scoreMap.get(i));
        }
    }
}

class Runner implements Runnable {

    Commander commander;
    int       id;

    public Runner(int id, Commander commander){
        this.id = id;
        this.commander = commander;

    }

    @Override
    public void run() {
        commander.waitStart();
        try {
            Thread.sleep((long) (Math.random() * 1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        commander.reach(id);
    }
}

4、CyclicBarrier使用

  CyclicBarrier在jdk中描述:一個同步輔助類,它容許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 頗有用。由於該 barrier 在釋放等待線程後能夠重用,因此稱它爲循環的 barrier。

  實際上CyclicBarrier相似於CountDownLatch也是個計數器。不一樣的是,當線程調用await方法後,必須全部線程都到達了,才能分別進入後面的執行過程。一旦有一個線程未到達,全部線程都會等待,有點像一部電影的名字《一個都不能少》。

/*
 * 考慮一個應用場景: 
 * 老師帶領學生去春遊,下午回來時,須要整隊,而後統一坐車回去。 
 * 在這裏,老師和已經到達的學生,必須等待全部學生歸隊後才能回去。 用代碼實現以下:
 */

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class Teacher {

    private final CyclicBarrier barrier = new CyclicBarrier(20, new Runnable() {

                                            @Override
                                            public void run() {
                                                System.out.println("全部學生已經歸隊");
                                            }
                                        });

    public void comeBack(int i) {
        System.out.println(i + "已經歸隊。");
        try {
            barrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Teacher t = new Teacher();
        for (int i = 0; i < 20; i++) {
            new Thread(new Student(i + 1, t)).start();
        }

    }
}

class Student implements Runnable {

    private final int     id;
    private final Teacher teacher;

    public Student(int id, Teacher teacher){
        this.id = id;
        this.teacher = teacher;
    }

    @Override
    public void run() {
        try {
            Thread.sleep((long) (Math.random() * 1000));
            teacher.comeBack(id);
            System.out.println(id + "上車");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

5、BlockingQueue 學習

  從名字上看,BlockingQueue是阻塞隊列的意思。這個隊列主要提供下面的功能:

  1.阻塞隊列提供了可阻塞的take和put方法,另外可定時的poll和offer實際原理也是同樣的。

  2.若是BlockingQueue是空的,從BlockingQueue取東西的操做將會被阻斷進入等待狀態,直到BlockingQueue進了東西纔會被喚醒,一樣,若是BlockingQueue是滿的,任何試圖往裏存東西的操做也會被阻斷進入等待狀態,直到BlockingQueue裏有空間時纔會被喚醒繼續操做。

  阻塞隊列有通常又爲兩類:無限阻塞隊列和有限阻塞隊列。有限阻塞隊列中,當隊列滿時,調用put方法將會阻塞;而無限阻塞隊列中,put方法是不會阻塞的。很顯然這個隊列的一種最經常使用的場景就是:生產者-消費者 模式。

/* 它有如下具體實現類:
 * ArrayBlockingQueue:採用數組做爲存儲隊列的阻塞隊列,這個隊列採用FIFO的方式管理數據
 * LinkedBlockingQueue:採用鏈式結構做爲存儲隊列,一樣它也採用FIFO的方式管理數據
 * PriorityBlockingQueue:採用基於優先級堆的極大優先級隊列做爲存儲隊列。
 * SynchronousQueue:特殊的BlockingQueue,其中每一個 put 必須等待一個 take,反之亦然。
 * DelayQueue:這是一個無限阻塞隊列,只有在延遲期滿時才能從中提取元素。該隊列的頭部 是延遲期滿後保存時間最長的 Delayed 元素。若是延遲都尚未期滿,則隊列沒有頭部。
 * 
 * 經過查看源代碼能夠看到這個類內部是採用PriorityQueue做爲存儲隊列
 * DelayQueue的一些經常使用的場景
 * a) 關閉空閒鏈接。服務器中,有不少客戶端的鏈接,空閒一段時間以後須要關閉之。
 * b) 緩存。緩存中的對象,超過了空閒時間,須要從緩存中移出。
 * c) 任務超時處理。在網絡協議滑動窗口請求應答式交互時,處理超時未響應的請求。
 * */

6、任務執行器Executor

  記得在大學的時候,有一次寫程序時,須要建立不少的線程來處理各類socket請求,因而寫了一個線程類,每出現一個socket請求,就建立一個線程。後來,老師指出,每次建立線程的開銷比較大,能夠將線程與具體的業務邏輯分離開來,而後用一個隊列,保存必定量的線程,每次須要的時候就去取,不用的時候就還回去,這樣能夠循環使用,避免重複建立線程的開銷,因而乎,對代碼進行重構,寫了大段的代碼,發現之後須要用到多線程的地方均可要用它,後來在網上找資料才知道,那叫線程池。

  jdk1.5的升級,給咱們帶來一個很特殊的包java.util.concurrent,翻閱API,能夠看到jdk中已經封裝了線程池,簡單兩行即可實現。這個包中提供了Executor的接口,接口定義以下:

void execute(Runnable command);

傳入一個runnable接口的實現,它便自動爲咱們建立線程和執行,任務的提交者不再用爲了併發執行,本身寫一大段代碼來建立並管理各類線程。擴展了Executor的有ExecutorServiceScheduledExecutorService,AbstractExecutorServiceScheduledThreadPoolExecutorThreadPoolExecutor.因爲Runnable接口中只提供了一個不帶返回值run方法,所以當任務須要返回值時,Executor就不能知足需求了,因而出現了ExecutorService,這個接口繼承了Executor,對提交任務的接口進行了擴展,引入了Callable接口,該接口定義以下:

public interface Callable<V> {
  V call() throws Exception;
}

同時接口將任務執行過程進行管理,分爲三個狀態,提交,shutdown,terminate。在AbstractExecutorService中能夠看到submit(Callable c)的實現,實際上它會先建立一個Future對象,而後再調用execute(Runnable command)方法,執行任務。能夠很明顯的知道Future確定是繼承了Runnable接口。經過Future接口,咱們能夠獲取由call方法調用的返回值。Executor接口的兩個具體實現是ThreadPoolExecutor和ScheduledThreadPoolExecutor,經過名字能夠看出,ThreadPoolExecutor是經過採用線程池來執行每一個提交的任務,ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,主要用於知足延遲後運行任務,或者按期執行任務的需求。雖然,咱們能夠經過直接調用ThreadPoolExecutor和ScheduledThreadPoolExecutor的構造函數生成Executor,可是不少狀況下,沒有這個必要,由於jdk爲咱們做了簡化工做,經過Executors這個工廠類,能夠只需傳入一些簡單的參數,即可以獲得咱們須要的Executor對象。

7、CompletionService學習

  若在採用Executor執行任務時,可用經過採用Future來獲取單個任務執行的結果,在Future中提供了一個get方法,該方法在任務執行返回以前,將會阻塞。當向Executor提交批處理任務時,而且但願在它們完成後得到結果,若是用FutureTask,你能夠循環獲取task,並用future.get()去獲取結果,若沒有完成則阻塞,這對於對任務結果須要分別對待的時候是可行的。可是若全部task產生的結果均可以被同等看待,這時候採用前面這樣的方式顯然是不可行了,由於若當前的task沒有完成,然後面的其它task已經完成,你也得等待,這個實效性不高。顯然不少時候,統一組task產生的結果都應該是沒有區別的,也就是知足上述第二種狀況。這個時候咋辦呢?jdk爲咱們提供了一個很好的接口CompletionService,這個接口的具體實現類是ExecutorCompletionService。該類中定義下面三個屬性:

private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;

executor由構造函數傳入,aes只是用於生成Future對象。特別要注意是completionQueue。它維護了一個保存Future對象的BlockingQueue。當這個Future對象狀態是結束的狀態的時候,也就是task執行完成以後,會將它加入到這個Queue中。究竟是在哪裏將完成的Future加入到隊列裏面的呢?又是怎麼知道task是何時結束的呢?在ExecutorCompletionService中定義了一個QueueingFuture類,該類的實現:

private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

能夠看到在done方法中,它會把當前的task加入到阻塞隊列中。追蹤done方法能夠看到,該方法定義在FutureTask中,默認實現爲空,從註釋能夠看出,當Future的狀態轉爲isDone的時候,就會調用該方法。調用端在調用CompletionService的take方法時,實際上調用的是BlockingQueue的take方法,由前面的學習中,咱們知道,當隊列中有內容時,該隊列會當即返回隊列中的對象,當隊列爲空時,調用線程將會阻塞。而只要有任務完成,調用線程就會跳出阻塞,得到結果。

8、ConcurrentHashMap學習

  java中提供對HashMap是咱們使用得比較多的一個類,單該類是非線程安全的,若處於多線程環境中,則須要經過synchronized關鍵字進行同步(經過查看Collections.synchronizedMap方法,能夠知道,實際上該方法對實現也是經過synchronized關鍵字進行同步控制),雖然它能保證線程安全,可是,因爲須要對整個map進行加鎖,這樣作的併發性能每每不是很理想,尤爲是map中數據量比較大的時候。jdk1.5以後,java.util.concurrent包中提供了ConcurrentHashMap,一方面,該類本身保證了線程安全性,另外一方面,該類也提供了一些複合操做的原子性接口。

  ConcurrentHashMap與HashMap同樣,一樣是哈希表,可是採用不一樣對鎖機制--分離鎖,即採用不一樣的鎖來同步不一樣的數據塊,以減小對鎖對競爭。   在ConcurrentHashMap內部採用了一個包含16個鎖對象對數組,每一個鎖負責同步hash Bucket的1/16,bucket中的每一個對象經過它的hashCode計算獲得它所在鎖對象。假設hash算法的實現可以提供合理的擴展性,而且關鍵字可以以統一的方式訪問,這會將對於鎖的請求減小到原來的1/16.這種基於分離鎖設計的技術實現可以使得ConcurrentHashMap支持16個併發的請求。

9、Lock使用

  在java5.0之前都是採用synchronized關鍵字進行同步控制,全部對象都自動含有單一的鎖,JVM負責跟蹤對象被加鎖的次數。若是一個對象被解鎖,其計數變爲0。在任務(線程)第一次給對象加鎖的時候,計數變爲1。每當這個相同的任務(線程)在此對象上得到鎖時,計數會遞增。只有首先得到鎖的任務(線程)才能繼續獲取該對象上的多個鎖。每當任務離開一個synchronized方法,計數遞減,當計數爲0的時候,鎖被徹底釋放,此時別的任務就可使用此資源。因爲這些鎖是由JVM來控制,所以也叫隱式鎖。

  在jdk5.0之後,提供了顯示鎖,即Lock,能夠說是對隱式鎖的功能的擴展,主要有兩個。一個是ReentrantLock,另外一個是ReentrantReadWriteLock,前者是普通重入鎖,後者是可重入的讀寫鎖。這些鎖提供了兩種鎖競爭機制:公平競爭和非公平競爭,公平競爭的實現其實是採用一個隊列保存等待的線程,噹噹前線程釋放鎖以後,取出隊頭的線程喚醒,使之能夠獲取鎖。java中Lock接口的定義:

public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}

Lock與synchronized的區別:

1.synchronized是在JVM層面上實現的,不但能夠經過一些監控工具監控synchronized的鎖定,並且在代碼執行時出現異常,JVM會自動釋放鎖。而Lock的釋放必須由程序本身保證,經常使用的寫法是把是把釋放鎖的代碼寫到try{}finally中,在finally塊中釋放鎖。

2.若是使用 synchronized ,若是A不釋放,B將一直等下去,不能被中斷,若是使用Lock,若是A不釋放,可使B在等待了足夠長的時間之後,中斷等待,而幹別的事情。

3.synchronize其實是ReentrantLock和Condition的組合的簡化版

4.相對於synchonized獨佔鎖,ReentrantReadWriteLock經過分離讀鎖和寫鎖,提供可共享的讀鎖提升讀併發性能。

相關文章
相關標籤/搜索