Java多線程&高併發

1、線程安全性

定義:當多個線程訪問某個類時,無論運行時環境採用 何種調度方式,或者這些線程將如何交替執行,而且在主調代碼中 不須要任何額外的同步或協同,這個類都能表現出 正確的行爲,那麼就稱這個類是線程安全的。

1. 原子性:提供了互斥訪問,同一時刻只能有一個線程來對它進行訪問。html

Atomic包:編程

  1. AtomicXXX:CAS、Unsafe.compareAndSwapInt
  2. AtomicLong、LongAdder
  3. AtomicReference、AtomicReferenceFieldUpdater
  4. AtomicStampReference:CAS的ABA問題

原子性 - synchronized(同步鎖)
修飾代碼塊:大括號括起來的代碼,做用於調用的對象
修飾方法:整個方法,做用於調用的對象
修飾靜態方法:整個靜態方法,做用於全部對象
修飾類:括號括起來的部分,做用於全部類
原子性 - 對比
synchronized:不可中斷鎖,適合競爭不激烈,可讀性好
Lock:可中斷鎖,多樣化同步,競爭激烈時能維持常態
Atomic:競爭激烈時能維持常態,比Lock性能好;只能同步一個值api

2. 可見性:一個線程對主內存的修改能夠及時的被其餘線程觀察到。數組

致使共享變量在線程見不可見的緣由緩存

  1. 線程交叉執行
  2. 衝排序結合線程交叉執行
  3. 共享變量更新後的值沒有在工做內存與主內存之間急事更新

synchronized、volatile
JMM關於synchronized的兩條規定:安全

  1. 線程解鎖前,必須把共享變量的最新制刷新到主內存
  2. 線程加鎖前,將清空工做內存中共享變量的值,從而使用共享變量時須要從主內存中從新讀取最新的值(注意:加鎖與解鎖是同一把鎖

volatile - 經過加入內存屏障禁止重排序優化來實現多線程

  1. 對volatile變量寫操做時,會在寫操做後加入一條store屏障指令,將本地內存中的共享變量值刷新到主內存
  2. 對volatile變量讀操做時,會在讀操做前加入一條load屏障指令,從主內存中讀取共享變量
  3. volatile變量在每次被線程訪問時,都強迫從主內存中讀取該變量的值,而當變量的值發生變化時,又會強迫線程將該變量最新的值強制刷新到主內存,這樣一來,任什麼時候候不一樣的線程總能看到該變量的最新值

3. 有序性:一個線程觀察其餘線程中的指令執行順序,因爲指令重排序的存在,該觀察結果通常雜亂無序。併發

Java內存模型中,容許編譯器和處理器對指令進行重排序,可是重排序過程不會影響到單線程程序的執行,卻會影響到多線程併發執行的正確性。volatile、synchronized、Lock。
【volatile變量規則】:對一個變量的寫操做先行發生於後面對這個變量的讀操做。(若是一個線程進行寫操做,一個線程進行讀操做,那麼寫操做會先行於讀操做。)
【傳遞規則】:若是操做A先行於操做B,而操做B又先行於操做C,那麼操做A就先行於操做C。
【線程啓動規則】:Thread對象的start方法先行發生於此線程的每個動做。
【線程中斷規則】:對線程interrupt方法的調用先行發生於被中斷線程的代碼檢測到中斷事件的發生。
【線程終結規則】:線程中全部的操做都先行發生於線程的終止檢測,咱們能夠經過Thread.join()方法結束、Thread.isAlive()方法的返回值手段檢測到線程已經終止執行。
【對象終結規則】:一個對象的初始化完成先行發生於他的finalize()方法的開始。

2、發佈對象

發佈對象:使一個對象可以被當前範圍以外的代碼所用。
對象溢出:一種錯誤的發佈。當一個對象尚未構造完成時,就使它被其餘線程所見。

3、安全發佈對象

在靜態初始化函數中初始化一個對象
將對象的引用保存到volatile類型域或者AtomicReference對象中
將對象的引用保存到某個正確構造對象的final類型域中
將對象的引用保存到一個由鎖保護的域中
/**
 * 懶漢模式
 * 雙重同步鎖單例模式
 * @author Guo
 *
 */
public class SingletonExample1 {
    
    private SingletonExample1(){
        
    }
    
    // volatile禁止指令重排
    private volatile static SingletonExample1 instance = null;
    
    public static SingletonExample1 getInstance(){
        if(instance == null){
            synchronized(SingletonExample1.class){
                if(instance == null){
                    instance = new SingletonExample1();
                }
            }
        }
        return instance;
    }

}

4、避免併發兩種方式

  1. 不可變對象
  2. 線程封閉

線程封閉: 把對象封裝到一個線程裏,只有這一個線程能夠看到這個對象,即便這個對象不是線程安全也不會出現任何線程安全問題,由於只在一個線程裏框架

  1. 堆棧封閉局部變量,無併發問題。棧封閉是咱們編程當中遇到的最多的線程封閉。什麼是棧封閉呢?簡單的說就是局部變量。多個線程訪問一個方法,此方法中的局部變量都會被拷貝一分兒到線程棧中。因此局部變量是不被多個線程所共享的,也就不會出現併發問題。因此能用局部變量就別用全局的變量,全局變量容易引發併發問題。
  2. ThreadLocal線程封閉:比較推薦的線程封閉方式。
    【ThreadLocal結合filter完成數據保存到ThreadLocal裏,線程隔離。】經過filter獲取到數據,放入ThreadLocal, 當前線程處理完以後interceptor將當前線程中的信息移除。使用ThreadLocal是實現線程封閉的最好方法。ThreadLocal內部維護了一個Map,Map的key是每一個線程的名稱,而Map的值就是咱們要封閉的對象。每一個線程中的對象都對應着Map中一個值,也就是ThreadLocal利用Map實現了對象的線程封閉

5、線程不安全類與寫法

【線程不安全】:若是一個類類對象同時能夠被多個線程訪問,若是沒有作同步或者特殊處理就會出現異常或者邏輯處理錯誤。
【1. 字符串拼接】:
StringBuilder(線程不安全)、
StringBuffer(線程安全)
【2. 日期轉換】:
SimpleDateFormat(線程不安全,最好使用局部變量[堆棧封閉]保證線程安全)
JodaTime 推薦使用(線程安全)
【3. ArrayList、HashSet、HashMap等Collections】:
ArrayList(線程不安全)
HashSet(線程不安全)
HashMap(線程不安全)
【**同步容器**synchronized修飾】
Vector、Stack、HashTable
Collections.synchronizedXXX(List、Set、Map)
【**併發容器** J.U.C】
ArrayList -> CopyOnWriteArrayList:(讀時不加鎖,寫時加鎖,避免複製多個副本出來將數據搞亂)寫操做時複製,當有新元素添加到CopyOnWriteArrayList中時,先從原有的數組中拷貝一份出來,在新的數組上進行寫操做,寫完以後再將原來的數組指向新的數組。

clipboard.png

HashSet、TreeSet -> CopyOnWriteArraySet、ConcurrentSkipListSet
HashMap、TreeMap -> ConcurrentHashMap、ConcurrentSkipListMap
相比ConcurrentHashMap,ConcurrentSkipListMap具備以下優點:jvm

  1. ConcurrentSkipListMap的存取速度是ConcurrentSkipListMap的4倍左右
  2. ConcurrentSkipListMap的key是有序的
  3. ConcurrentSkipListMap支持更高的併發(它的存取時間和線程數幾乎沒有關係,更高併發的場景下越能體現出優點)

6、安全共享對象策略 - 總結

  1. 線程限制:一個被線程限制的對象,由線程獨佔,而且只能被佔有它的線程修改
  2. 共享只讀:一個共享只讀的對象,在沒有額外同步的狀況下,能夠被多個線程併發訪問,可是任何線程都不能修改它
  3. 線程安全對象:一個線程安全的對象或者容器,在內部經過同步機制來保證線程安全,因此其餘線程無需額外的同步就能夠經過公共接口隨意訪問它
  4. 被守護對象:被守護對象只能經過獲取特定鎖來訪問

7、J.U.C 之 AQS

7.一、 AQS

AQS:AbstractQueneSynchronizer

  1. 使用Node實現FIFO隊列,能夠用於構建鎖或者其餘同步裝置的基礎框架
  2. 利用int類型表示狀態
  3. 使用方法是繼承
  4. 子類經過繼承並經過實現它的方法管理其狀態{ acquire和release }的方法操縱狀態
  5. 能夠同時實現排它鎖和共享鎖模式(獨佔、共享)

7.二、 AQS的同步組件以下:

7.2.一、CountDownLatch:閉鎖,經過計數來保證線程是否一直阻塞.
CountDownLatch是經過一個計數器來實現的,計數器的初始值爲線程的數量。每當一個線程完成了本身的任務後,計數器的值就會減1。當計數器值到達0時,它表示全部的線程已經完成了任務,而後在閉鎖上等待的線程就能夠恢復執行任務。構造器中的計數值(count)實際上就是閉鎖須要等待的線程數量。這個值只能被設置一次,並且CountDownLatch沒有提供任何機制去從新設置這個計數值。

與CountDownLatch的第一次交互是主線程等待其餘線程。主線程必須在啓動其餘線程後當即調用CountDownLatch.await()方法。這樣主線程的操做就會在這個方法上阻塞,直到其餘線程完成各自的任務。

其餘N 個線程必須引用閉鎖對象,由於他們須要通知CountDownLatch對象,他們已經完成了各自的任務。這種通知機制是經過 CountDownLatch.countDown()方法來完成的;每調用一次這個方法,在構造函數中初始化的count值就減1。因此當N個線程都調 用了這個方法,count的值等於0,而後主線程就能經過await()方法,恢復執行本身的任務。

解釋一下CountDownLatch概念?
`CountDownLatch`和 `CyclicBarrier`的不一樣之處?
給出一些CountDownLatch使用的例子?
 CountDownLatch類中主要的方法?

clipboard.png

public class CountDownLatchExample1 {
    
    // 線程數
    private final static int threadCount = 200;
    
    public static void main(String[] args) throws InterruptedException{
        // 使用線程池進行調度
        ExecutorService exec = Executors.newCachedThreadPool();
        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    test(threadNum);
                } catch (Exception e) {
                    System.out.println("exception:" + e);
                }finally{
                    countDownLatch.countDown(); // 計數器減一
                }
            });
        }
        countDownLatch.await(10, TimeUnit.MILLISECONDS);
        System.out.println("===finished===");
        exec.shutdown();
    }
    
    
    private static void test(int threadNum) throws InterruptedException{
        Thread.sleep(100);
        System.out.println("threadNum:" + threadNum);
    }
    

}
7.2.二、 Semaphore(信號量):能夠控制同一時間併發線程的數目
主要函數:acquire、release、tryAcquire
public class SemaphoreExample1 {
    
    // 線程數
    private final static int threadCount = 20;
    
    public static void main(String[] args) throws InterruptedException{
        // 使用線程池進行調度
        ExecutorService exec = Executors.newCachedThreadPool();
        //併發控制(容許併發數20)
        final Semaphore semaphore = new Semaphore(3);
        
        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    if(semaphore.tryAcquire(5, TimeUnit.SECONDS)){
                        test(threadNum);
                        semaphore.release();
                    }
                    /** 多個許可:在代碼中一共有10個許可,每次執行semaphore.acquire(5);
                     * 代碼時耗費掉5個,因此20/5=4,
                     * 說明同一時間只有4個線程容許執行acquire()和release()之間的代碼。
                     * */
//                    semaphore.acquire(3); // 獲取許可
//                    test(threadNum);
//                    semaphore.release(3); // 釋放許可
                } catch (Exception e) {
                    System.out.println("exception:" + e);
                }finally{
                    countDownLatch.countDown(); // 計數器減一
                }
            });
        }
//        countDownLatch.await(100, TimeUnit.MILLISECONDS);
        System.out.println("===finished===");
        exec.shutdown();
    }
    
    
    private static void test(int threadNum) throws InterruptedException{
        System.out.println("threadNum:" + threadNum);
        Thread.sleep(1000);
    }
    

}
7.2.三、 CyclicBarrier:能夠完成多個線程之間相互等待,只有當每一個線程都準備就緒後,才能各自繼續往下執行
應用場景:須要全部的子任務都完成時,才執行主任務,這個時候就能夠選擇使用CyclicBarrier。

簡單理解【`人滿發車`】:
長途汽車站提供長途客運服務。
當等待坐車的乘客到達20人時,汽車站就會發出一輛長途汽車,讓這20個乘客上車走人。
等到下次等待的乘客又到達20人是,汽車站就會又發出一輛長途汽車。
public class CyclicBarrierExample1 {
    
    // 線程數
    private final static int threadCount = 10;
    
    // 屏障的線程數目 5 
    private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
        System.out.println("===continue===");
    });
    
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool(); 
        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            Thread.sleep(500);
            executorService.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        
        
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        System.out.println("===" + threadNum + " is ready.");
        try{
            barrier.await(2000, TimeUnit.MILLISECONDS);
        }catch(Exception e){
            System.out.println("e:"+e);
        }
        System.out.println("===" + threadNum + " continue");
    }

}
7.2.四、ReentrantLock
1. api:
    - lock()
    - unlock()
    - tryLock()
private static Lock lock = new ReentrantLock();
   private static void test(int threadNum){
           lock.lock();
           try{
               count++;
           }finally{
               lock.unlock();
           }
       }
2. ReentrantLock和synchronized的區別
    - 1. `可重入性`
    - 2. `鎖的實現`:synchronized是jvm實現,ReentrantLock是jdk實現
    - 3. `性能區別`
    - 4. `功能方面的區別`
3. ReentrantLock獨有的功能
    - 1. 可指定是公平鎖仍是非公平鎖,synchronized只能是非公平鎖(公平鎖:先等待的線程先得到鎖)
    - 2. 提供了一個Condition類,能夠分組喚醒須要喚醒的線程
    - 3. 提供可以中斷等待鎖的線程的機制,lock.lockInterruptibly()
4. ReentrantReadWriteLock
5. StampedLock
6. 鎖的使用
   - 當只有少許競爭者線程的時候,`synchronized`是一個很好的通用的鎖的實現(synchronized不會引起死鎖,jvm會自動解鎖)
   - 競爭者線程很多,可是線程增加的趨勢是能夠預估的,這時候使用`ReentrantLock`是一個很好的通用的鎖的實現
7.2.五、Condition
public class LockExample3 {
public static void main(String[] args){
    ReentrantLock reentrantLock = new ReentrantLock();
    Condition condition = reentrantLock.newCondition();
    int u=1;
    
    
    new Thread(() -> {
        try{
            reentrantLock.lock();
            System.out.println("wait signal"); // 1
            condition.await();
        }catch(InterruptedException e){
            e.printStackTrace();
        }
        System.out.println("get signal");
        reentrantLock.unlock();
    }).start();
    
    new Thread(() -> {
        reentrantLock.lock();
        System.out.println("get lock");
        try{
            Thread.sleep(3000);
        }catch(InterruptedException e){
            e.printStackTrace();
        }
        condition.signalAll();
        System.out.println("send signal");
        reentrantLock.unlock();
    }).start();
    
    
}

}
7.2.六、FutureTask
建立線程兩種方式繼承Thread,實現Runnable接口,這兩種方式,在任務執行完畢以後獲取不到執行結果
    FutureTask、Callable能夠獲取到執行結果
    1. Callable和Runnable對比
    2. Future接口
    3. FutureTask
    ```
    public static void main(String[] args) throws InterruptedException, ExecutionException {
    FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
        @Override
        public String call() throws Exception {
            System.out.println("do something in callable...");
            Thread.sleep(3000);
            return "Done";
        }
    });
    
    new Thread(futureTask).start();
    System.out.println("do something in main...");
    Thread.sleep(1000);
    String result = futureTask.get();
    System.out.println("result:"+result);
}

}

7.2.七、Fork/Join框架:將大模塊切分紅多個小模塊進行計算

8、線程池

初始化好線程池實例以後,將任務丟進去等待調度執行。

8.一、Thread弊端

  1. 每次new Thread都要新建對象,性能差
  2. 線程缺少統一管理,可能無限制的新建線程,相互競爭,有可能佔用過多的系統資源致使死機或者OOM
  3. 缺乏更多功能,如更多執行,按期執行,線程中斷

8.二、線程池的好處

  1. 能夠重用存在的線程,減小對象的建立、消亡的開銷,性能佳
  2. 能夠有效的控制最大併發數,提供系統資源利用率,同時能夠避免過多的資源競爭,避免阻塞
  3. 提供定時執行、按期執行、單線程、併發數控制等功能
  4. ThreadPoolExecutor的初始化參數】
    corePoolSize:核心線程數量
    maximumPoolSize:縣城最大線程數
    workQueue:阻塞隊列,存儲等待執行的任務,很重要,會對線程池運行過程產生重大影響
    keepAliveTime:線程沒有任務執行時,最多保持多久時間終止
    unit:keepAliveTime的時間單位
    hreadFactory:線程工廠,用來建立線程
    rejectHandler:當拒絕處理任務時的策略

線程池-ThreadPoolExecutor狀態
clipboard.png

線程池-ThreadPoolExecutor方法

1. execute():提交任務,交給線程池執行
2. submit():提交任務可以返回執行結果execute + Future
3. shutdown():關閉線程池,等待任務都執行完
4. shutdownNow():關閉線程池,不等待任務執行完
5. getTaskCount():線程池已執行和未執行的任務總數
6. getCompletedTaskCount():已完成的任務總數
7. getPoolSize():線程池當前的線程數量
8. getActiveCount:當前線程池中正在執行任務的線程數量

8.三、線程池 - Executors框架(建立線程池)

  1. Executors.newCachedThreadPool:建立一個可緩存的線程池,若是線程池長度超過了處理的須要能夠靈活回收空閒線程,若是沒有能夠回收的,那麼就新建線程
public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        // 往線程池中聽任務
        for (int i = 0; i < 10; i++) {
            final int index = i; // 任務的序號
            executorService.execute(() -> {
                System.out.println("===task:"+index);
            });
        }
        executorService.shutdown(); // 關閉線程池
    }
  1. Executors.newFixedThreadPool:建立的是一個定長的線程池,能夠控制線程的最大併發數,超出的線程會在隊列中等待
  2. Executors.newScheduledThreadPool:建立的也是定長線程池,支持定時以及週期性的任務執行
public static void main(String[] args) {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        
        // 往線程池中聽任務
        executorService.scheduleAtFixedRate(() -> {
            log.info("===sechedule run");
        }, 1, 3, TimeUnit.SECONDS); // 延遲一秒,每隔三秒執行任務
        
        
        executorService.schedule(() -> {
            log.info("===sechedule run");
        }, 3, TimeUnit.SECONDS);
        
        executorService.shutdown(); // 關閉線程池
    }
  1. Executors.newSingleThreadExecutor:建立的是一個單線程化的線程池,會用惟一的一個工做線程來執行任務,保證全部任務按照指令順序執行(指令順序能夠指定它是按照先入先出,優先級執行)

newSingleThreadExecutor打印結果是按照順序輸出
clipboard.png

8.四、線程池 - 合理配置

1. CPU密集型任務,就須要儘可能壓榨CPU,參考能夠設置爲NCPU+1
2. IO密集型任務,參考能夠設置爲2*NCPU
> NCPU = CPU的數量
> UCPU = 指望對CPU的使用率 0 ≤ UCPU ≤ 1
> W/C = 等待時間與計算時間的比率
> 若是但願處理器達到理想的使用率,那麼線程池的最優大小爲:
> 線程池大小=NCPU *UCPU(1+W/C)

https://www.cnblogs.com/super...
https://www.cnblogs.com/super...

相關文章
相關標籤/搜索