Java併發多線程 - 併發工具類JUC

安全共享對象策略

1.線程限制 : 一個被線程限制的對象,由線程獨佔,而且只能被佔有它的線程修改
2.共享只讀 : 一個共享只讀的對象,在沒有額外同步的狀況下,能夠被多個線程併發訪問,
可是任何線程都不能修改它
3.線程安全對象 : 一個線程安全的對象或則容器,在內部經過同步機制來保證線程安全,
因此其餘線程無需額外的同步就能夠經過公共接口隨意訪問它
4.被守護對象 : 被守護對象只能經過獲取特定的鎖來訪問安全

線程安全 - 同步容器

採用synchronized關鍵字同步,缺點 :併發

  1. 不能完成作到線程安全
  2. 性能差

ArrayLisy -> Vector, Stack
HashMap -> HashTable (key、value不能爲null)
Collections.synchronizedXXX(List、Set、Map)框架

線程安全 - 併發容器 J.U.C

ArrayList -> CopyOnWriteArrayList
HashSet、TreeSet -> CopyOnWriteArraySet ConcurrentSkipListSet
HashMap、TreeMap -> ConcurrentHashMap ConcurrentSkipListMap工具

AbstractQueuedSynchronizer - AQS性能

  1. 使用Node實現FIFO隊列,能夠用於構建鎖或則其餘同步裝置的基礎框架
  2. 利用一個int類型表示狀態
  3. 使用方法是基礎
  4. 子類經過繼承並經過實現它的方法管理其狀態 { acquire 和 release} 的方法操縱狀態
  5. 能夠同時實現排他鎖和共享鎖模式(獨佔、共享)

經常使用類

CountDownLatch
Semaphore
CyclicBarrier
ReentrantLock
Condition
FutureTaskui

CountDownLacth

CountDownLatch是一個同步工具類,它容許一個或多個線程一直等待,直到其餘線程執行完後再執行。例如,應用程序的主線程但願在負責啓動框架服務的線程已經啓動全部框架服務以後執行。線程

CountDownLatch是經過一個計數器來實現的,計數器的初始化值爲線程的數量。每當一個線程完成了本身的任務後,計數器的值就相應得減1。當計數器到達0時,表示全部的線程都已完成任務,而後在閉鎖上等待的線程就能夠恢復執行任務。調試

@Self4j
public class CountDownLatchExample {

    private final  static int threadCount = 200;
    
    public static void main(String[] arg) {
    
        ExecutorService exec = Executors.newCachedThreadPool(); 
        
        final CountDownLatch lacth = new CountDownLatch(5);
        
        for (int i = 0; i < 1000; i++) {
            exec.excute( () -> {
            final int threadNum  = i;
            try {
                test(threadNum);
            } catch (Exception e) {
                log.error("exception", e);
            } finally {
                // latch遞減
                lacth.countDown();
            }
            });
        }
        // 等待latch計數器爲0,則繼續往下執行
        latch.await();
        // latch的await方法也能夠傳入等待時間,等到等待時間後無論有沒完成計數都往下執行
        // latch.await( 10, TimeUnit.MILLISECONDS);
        log.info("finished");
        exec.shutdown();
    }

    public static void test(int i)  throw Exception{
        log.info("thread: {}", i);
    }
}

Semaphore

Semaphore(int permits):構造方法,建立具備給定許可數的計數信號量並設置爲非公平信號量。
Semaphore(int permits,boolean fair):構造方法,當fair等於true時,建立具備給定許可數的計數信號量並設置爲公平信號量。
void acquire():今後信號量獲取一個許可前線程將一直阻塞。
void acquire(int n):今後信號量獲取給定數目許可,在提供這些許可前一直將線程阻塞。
void release():釋放一個許可,將其返回給信號量。就如同車開走返回一個車位。
void release(int n):釋放n個許可。
int availablePermits():獲取當前可用的許可數。
boolean tryAcquire():僅在調用時此信號量存在一個可用許可,才從信號量獲取許可。
boolean tryAcquire(int permits):僅在調用時此信號量中有給定數目的許可時,才今後信號量中獲取這些許可。code

boolean tryAcquire(int permits,
                          long timeout,
                          TimeUnit unit)
                   throws InterruptedException

若是在給定的等待時間內此信號量有可用的全部許可,而且當前線程未被 中斷,則今後信號量獲取給定數目的許可。對象

@Self4j
public class SemaphoreExample {

    private final  static int threadCount = 200;

    public static void main(String[] arg) {

        ExecutorService exec = Executors.newCachedThreadPool(); 
    
        final Semaphore semaphore = new Semaphore(3);
    
        for (int i = 0; i < threadCount; i++) {
            exec.excute( () )-> {
            final int threadNum  = i;
            try {
                // tryAcquire會嘗試去獲取一個信號量,若是獲取不到
                // 則什麼都不會發生,走接下來的邏輯
                // if (semaphore.tryAcquire(1)) {
                //    test(i);
                //    semaphore.release();//釋放一個信號量
                // }
                semaphore.acquire();//獲取一個信號量
                test(i);
                semaphore.release();//釋放一個信號量
            } catch (Exception e) {
                log.error("exception", e);
            } 
            });
        }
        log.info("finished");
        exec.shutdown();
    }

    public static void test(int i)  throw Exception{
        log.info("thread: {}", i);
    }
}

CyclicBarrier

一個同步輔助類,它容許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。

CyclicBarrier(int parties, Runnable barrierAction)
建立一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啓動,並在啓動 barrier 時執行給定的屏障操做,該操做由最後一個進入 barrier 的線程執行。

CyclicBarrier(int parties)
建立一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啓動,但它不會在啓動 barrier 時執行預約義的操做。

int await()
在全部 參與者都已經在此 barrier 上調用 await 方法以前,將一直等待。

int await(long timeout,
                 TimeUnit unit)
          throws InterruptedException,
                 BrokenBarrierException,
                 TimeoutException

在全部 參與者都已經在此屏障上調用 await 方法以前將一直等待,或者超出了指定的等待時間。

boolean isBroken() : 查詢此屏障是否處於損壞狀態。

void reset() :
將屏障重置爲其初始狀態。若是全部參與者目前都在屏障處等待,則它們將返回,同時拋出一個 BrokenBarrierException。注意,在因爲其餘緣由形成損壞 以後,實行重置可能會變得很複雜;此時須要使用其餘方式從新同步線程,並選擇其中一個線程來執行重置。與爲後續使用建立一個新 barrier 相比,這種方法可能更好一些。

int getNumberWaiting() :返回當前在屏障處等待的參與者數目。此方法主要用於調試和斷言。

@Self4j
public class CyclicBarrierExample {

    private final  static int threadCount = 200;
    
    private final static CyclicBarrier cyclicBarrier = new CyclicBarrier(7, 
        () -> {
        log.info("callback is running !");
        }
    );
    
    public static void main(String[] arg) {
    
        ExecutorService exec = Executors.newCachedThreadPool(); 
        
        for (int i = 0; i < threadCount; i++) {
            exec.excute( () -> {
                final int threadNum  = i;
                try {
                    race(i);
                } catch (Exception e) {
                    log.error("exception", e);
                } 
            });
        }
        log.info("finished");
        
        exec.shutdown();
    }
    
    public static void race(int i)  throw Exception{
        log.info("thread {} is ready", i);
        cyclicBarrier.await();
        log.info("thread {} is continue", i);
    }
}
相關文章
相關標籤/搜索