阿里P7告訴你什麼是java併發包、線程池、鎖

併發包

java.util.concurrent從jdk1.5開始新加入的一個包,致力於解決併發編程的線程安全問題,使用戶可以更爲快捷方便的編寫多線程狀況下的併發程序。java

同步容器

同步容器只有包括Vector和HashTable,相比其餘容器類只是多用了Synchronize的技術數據庫

Vector與ArrayList區別

1.ArrayList是最經常使用的List實現類,內部是經過數組實現的,它容許對元素進行快速隨機訪問。數組的缺點是每一個元素之間不能有間隔,當數組大小不知足時須要增長存儲能力,就要講已經有數組的數據複製到新的存儲空間中。當從ArrayList的中間位置插入或者刪除元素時,須要對數組進行復制、移動、代價比較高。所以,它適合隨機查找和遍歷,不適合插入和刪除。編程

2.Vector與ArrayList同樣,也是經過數組實現的,不一樣的是它支持線程的同步,即某一時刻只有一個線程可以寫Vector,避免多線程同時寫而引發的不一致性,但實現同步須要很高的花費,所以,訪問它比訪問ArrayList慢數組

注意: Vector線程安全、ArrayList線程不安全緩存

HasTable與HasMap區別

1.HashMap不是線程安全的 安全

HastMap是一個接口 是map接口的子接口,是將鍵映射到值的對象,其中鍵和值都是對象,而且不能包含重複鍵,但能夠包含重複值。HashMap容許null key和null value,而hashtable不容許。多線程

2.HashTable是線程安全的一個Collection。併發

3.HashMap是Hashtable的輕量級實現(非線程安全的實現),他們都完成了Map接口,主要區別在於HashMap容許空(null)鍵值(key),因爲非線程安全,效率上可能高於Hashtable。app

HashMap容許將null做爲一個entry的key或者value,而Hashtable不容許。dom

HashMap把Hashtable的contains方法去掉了,改爲containsvalue和containsKey。

注意: HashTable線程安全,HashMap線程不安全。

synchronizedMap synchronizedList

Collections.synchronized*(m) 將線程不安全集合變爲線程安全集合

Map m = Collections.synchronizedMap(new HashMap());  List l = Collections.synchronizedList(new ArrayList());

ConcurrentHashMap

ConcurrentHashMap內部使用段(Segment)來表示這些不一樣的部分,每一個段其實就是一個小的HashTable,它們有本身的鎖。只要多個修改操做發生在不一樣的段上,它們就能夠併發進行。把一個總體分紅了16個段(Segment)也就是最高支持16個線程的併發修改操做。

這也是在重線程場景時減少鎖的粒度從而下降鎖競爭的一種方案。而且代碼中大多共享變量使用volatile關鍵字聲明,目的是第一時間獲取修改的內容,性能很是好。

併發容器

CountDownLatch

CountDownLatch是JAVA提供在java.util.concurrent包下的一個輔助類,能夠把它當作是一個計數器,其內部維護着一個count計數,只不過對這個計數器的操做都是原子操做,同時只能有一個線程去操做這個計數器,CountDownLatch經過構造函數傳入一個初始計數值,調用者能夠經過調用CounDownLatch對象的cutDown()方法,來使計數減1;若是調用對象上的await()方法,那麼調用者就會一直阻塞在這裏,直到別人經過cutDown方法,將計數減到0,才能夠繼續執行。

public class Test002 {  public static void main(String[] args) throws InterruptedException {    System.out.println("等待子線程執行完畢...");    CountDownLatch countDownLatch = new CountDownLatch(2);    new Thread(new Runnable() {      @Override      public void run() {        System.out.println("子線程," +Thread.currentThread().getName() + "開始執行...");        countDownLatch.countDown();// 每次減去1        System.out.println("子線程," + Thread.currentThread().getName() + "結束執行...");      }    }).start();    new Thread(new Runnable() {      @Override      public void run() {        System.out.println("子線程," + Thread.currentThread().getName() + "開始執行...");        countDownLatch.countDown();        System.out.println("子線程," + Thread.currentThread().getName() + "結束執行...");        }      }).start();      countDownLatch.await();// 調用當前方法主線程阻塞 countDown結果爲0, 阻塞變爲運行狀態      System.out.println("兩個子線程執行完畢....");      System.out.println("繼續主線程執行..");    }}

CyclicBarrier

一個同步輔助類,它容許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 頗有用。由於該 barrier 在釋放等待線程後能夠重用,因此稱它爲循環 的 barrier。

使用場景

須要全部的子任務都完成時,才執行主任務,這個時候就能夠選擇使用CyclicBarrier。

public class CyclicBarrierTest {  public static void main(String[] args) throws IOException, InterruptedException {    //若是將參數改成4,可是下面只加入了3個選手,這永遠等待下去    //Waits until all parties have invoked await on this barrier.     CyclicBarrier barrier = new CyclicBarrier(3);    ExecutorService executor = Executors.newFixedThreadPool(3);    executor.submit(new Thread(new Runner(barrier, "1號選手")));    executor.submit(new Thread(new Runner(barrier, "2號選手")));    executor.submit(new Thread(new Runner(barrier, "3號選手")));    executor.shutdown();  }}class Runner implements Runnable {  // 一個同步輔助類,它容許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)  private CyclicBarrier barrier;  private String name;  public Runner(CyclicBarrier barrier, String name) {    super();    this.barrier = barrier;    this.name = name;  }  @Override  public void run() {    try {      Thread.sleep(1000 * (new Random()).nextInt(8));      System.out.println(name + " 準備好了...");      // barrier的await方法,在全部參與者都已經在此 barrier 上調用 await 方法以前,將一直等待。      barrier.await();    } catch (InterruptedException e) {      e.printStackTrace();    } catch (BrokenBarrierException e) {      e.printStackTrace();    }    System.out.println(name + " 起跑!");  }}

Semaphore

Semaphore是計數信號量。Semaphore管理一系列許可證。每一個acquire方法阻塞,直到有一個許可證能夠得到而後拿走一個許可證;每一個release方法增長一個許可證,這可能會釋放一個阻塞的acquire方法。然而,其實並無實際的許可證這個對象,Semaphore只是維持了一個可得到許可證的數量。 

Semaphore常常用於限制獲取某種資源的線程數量

需求: 一個廁所只有3個坑位,可是有10我的來上廁所,那怎麼辦?假設10我的的編號分別爲1-10,而且1號先到廁所,10號最後到廁所。那麼1-3號來的時候必然有可用坑位,順利如廁,4號來的時候須要看看前面3人是否有人出來了,若是有人出來,進去,不然等待。一樣的道理,4-10號也須要等待正在上廁所的人出來後才能進去,而且誰先進去這得看等待的人是否有素質,是否能遵照先來先上的規則。

class Parent implements Runnable {  private String name;  private Semaphore wc;  public Parent(String name,Semaphore wc){    this.name=name;    this.wc=wc;  }  @Override  public void run() {    try {      // 剩下的資源(剩下的茅坑)      int availablePermits = wc.availablePermits();      if (availablePermits > 0) {        System.out.println(name+"天助我也,終於有茅坑了...");      } else {        System.out.println(name+"怎麼沒有茅坑了...");      }      //申請茅坑 若是資源達到3次,就等待      wc.acquire();      System.out.println(name+"終於輪我上廁所了..爽啊");        Thread.sleep(new Random().nextInt(1000)); // 模擬上廁所時間。      System.out.println(name+"廁所上完了...");      wc.release();    } catch (Exception e) {    }  }}public class TestSemaphore02 {  public static void main(String[] args) {    Semaphore semaphore = new Semaphore(3);    for (int i = 1; i <=10; i++) {       Parent parent = new Parent("第"+i+"我的,",semaphore);       new Thread(parent).start();    }  }}

併發隊列

ConcurrentLinkedQueue

ConcurrentLinkedQueue : 是一個適用於高併發場景下的隊列,經過無鎖的方式,實現了高併發狀態下的高性能,一般ConcurrentLinkedQueue性能好於BlockingQueue.它是一個基於連接節點的無界線程安全隊列。該隊列的元素遵循先進先出的原則。頭是最早加入的,尾是最近加入的,該隊列不容許null元素。

add 和offer() 都是加入元素的方法(在ConcurrentLinkedQueue中這倆個方法沒有任何區別) poll() 和peek() 都是取頭元素節點,區別在於前者會刪除元素,後者不會。

public class ConcurrentLinkedQueueTest {

  private static ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<Integer>();

  private static int count = 2; // 線程個數

  //CountDownLatch,一個同步輔助類,在完成一組正在其餘線程中執行的操做以前,它容許一個或多個線程一直等待。

  private static CountDownLatch latch = new CountDownLatch(count);  public static void main(String[] args) throws InterruptedException {    long timeStart = System.currentTimeMillis();    ExecutorService es = Executors.newFixedThreadPool(4);    ConcurrentLinkedQueueTest.offer();    for (int i = 0; i < count; i++) {      es.submit(new Poll());    }    latch.await(); //使得主線程(main)阻塞直到latch.countDown()爲零才繼續執行    System.out.println("cost time " + (System.currentTimeMillis() - timeStart) + "ms");    es.shutdown();  }  /**   * 生產   */  public static void offer() {    for (int i = 0; i < 100000; i++) {      queue.offer(i);    }  }  static class Poll implements Runnable {    public void run() {      // while (queue.size()>0) {      while (!queue.isEmpty()) {        System.out.println(queue.poll());      }      latch.countDown();    }  }}

BlockingQueue

阻塞隊列(BlockingQueue)是一個支持兩個附加操做的隊列。這兩個附加的操做是:

在隊列爲空時,獲取元素的線程會等待隊列變爲非空。

當隊列滿時,存儲元素的線程會等待隊列可用。 

阻塞隊列經常使用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裏拿元素

ArrayBlockingQueue

ArrayBlockingQueue是一個有邊界的阻塞隊列,它的內部實現是一個數組。有邊界的意思是它的容量是有限的,咱們必須在其初始化的時候指定它的容量大小,容量大小一旦指定就不可改變。

ArrayBlockingQueue是以先進先出的方式存儲數據,最新插入的對象是尾部,最新移出的對象是頭部。

LinkedBlockingQueue

LinkedBlockingQueue阻塞隊列大小的配置是可選的,若是咱們初始化時指定一個大小,它就是有邊界的,若是不指定,它就是無邊界的。說是無邊界,實際上是採用了默認大小爲Integer.MAX_VALUE的容量 。它的內部實現是一個鏈表

SynchronousQueue

SynchronousQueue隊列內部僅容許容納一個元素。當一個線程插入一個元素後會被阻塞,除非這個元素被另外一個線程消費

public class BlockingQueueTest2 {  /**   *    * 定義裝蘋果的籃子   *    */  public class Basket {    // 籃子,可以容納3個蘋果    BlockingQueue<String> basket = new LinkedBlockingQueue<String>(3);    // 生產蘋果,放入籃子    public void produce() throws InterruptedException {      // put方法放入一個蘋果,若basket滿了,等到basket有位置      basket.put("An apple");    }    // 消費蘋果,從籃子中取走    public String consume() throws InterruptedException {      // take方法取出一個蘋果,若basket爲空,等到basket有蘋果爲止(獲取並移除此隊列的頭部)      return basket.take();    }  }  // 定義蘋果生產者  class Producer implements Runnable {    private String instance;    private Basket basket;    public Producer(String instance, Basket basket) {      this.instance = instance;      this.basket = basket;    }    public void run() {      try {        while (true) {          // 生產蘋果          System.out.println("生產者準備生產蘋果:" + instance);          basket.produce();          System.out.println("!生產者生產蘋果完畢:" + instance);          // 休眠300ms          Thread.sleep(300);        }      } catch (InterruptedException ex) {        System.out.println("Producer Interrupted");      }    }  }  // 定義蘋果消費者  class Consumer implements Runnable {    private String instance;    private Basket basket;    public Consumer(String instance, Basket basket) {      this.instance = instance;      this.basket = basket;    }    public void run() {      try {        while (true) {          // 消費蘋果          System.out.println("消費者準備消費蘋果:" + instance);          System.out.println(basket.consume());          System.out.println("!消費者消費蘋果完畢:" + instance);          // 休眠1000ms          Thread.sleep(1000);        }      } catch (InterruptedException ex) {        System.out.println("Consumer Interrupted");      }    }  }  public static void main(String[] args) {    BlockingQueueTest2 test = new BlockingQueueTest2();    // 創建一個裝蘋果的籃子    Basket basket = test.new Basket();    ExecutorService service = Executors.newCachedThreadPool();    Producer producer = test.new Producer("生產者001", basket);    Producer producer2 = test.new Producer("生產者002", basket);    Consumer consumer = test.new Consumer("消費者001", basket);    service.submit(producer);    service.submit(producer2);    service.submit(consumer);    // 程序運行5s後,全部任務中止//    try {//      Thread.sleep(1000 * 5);//    } catch (InterruptedException e) {//      e.printStackTrace();//    }//    service.shutdownNow();  }}

PriorityBlockingQueue

優先級阻塞隊列,該實現類須要本身實現一個繼承了 Comparator 接口的類, 在插入資源時會按照自定義的排序規則來對資源數組進行排序。 其中值大的排在數組後面 ,取值時從數組頭開始取

public class TestQueue{  static Logger logger = LogManager.getLogger();  static Random random = new Random(47);  public static void main(String args[]) throws InterruptedException  {    PriorityBlockingQueue<PriorityEntity> queue = new PriorityBlockingQueue<PriorityEntity>();    ExecutorService executor = Executors.newCachedThreadPool();    executor.execute(new Runnable()    {      public void run()      {        int i = 0;        while (true)        {          queue.put(new PriorityEntity(random.nextInt(10), i++));          try          {            TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));          }          catch (InterruptedException e)          {            logger.error(e);          }        }      }    });    executor.execute(new Runnable()    {      public void run()      {        while (true)        {          try          {            System.out.println("take-- " + queue.take() + " left:-- [" + queue.toString() + "]");            try            {              TimeUnit.MILLISECONDS.sleep(random.nextInt(3000));            }            catch (InterruptedException e)            {              logger.error(e);            }          }          catch (InterruptedException e)          {            logger.error(e);          }        }      }    });    try    {      TimeUnit.SECONDS.sleep(5);    }    catch (InterruptedException e)    {      logger.error(e);    }  }  static class PriorityEntity implements Comparable<PriorityEntity>  {    private static int count = 0;    private int id = count++;    private int priority;    private int index = 0;    public PriorityEntity(int _priority, int _index)    {      System.out.println("_priority : " + _priority);      this.priority = _priority;      this.index = _index;    }    public String toString()    {      return id + "# [index=" + index + " priority=" + priority + "]";    }    //數字小,優先級高    public int compareTo(PriorityEntity o)    {      return this.priority > o.priority ? 1 : this.priority < o.priority ? -1 : 0;    }  }}

線程池

開發過程當中,合理地使用線程池能夠帶來3個好處:

下降資源消耗:經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。

提升響應速度:當任務到達時,任務能夠不須要等到線程建立就能當即執行。

提升線程的可管理性:線程是稀缺資源,若是無限制地建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一分配、調優和監控。

線程池做用

線程池做用就是限制系統中執行線程的數量。

根據系統的環境狀況,能夠自動或手動設置線程數量,達到運行的最佳效果;少了浪費了系統資源,多了形成系統擁擠效率不高。用線程池控制線程數量,其餘線程排隊等候。一個任務執行完畢,再從隊列的中取最前面的任務開始執行。若隊列中沒有等待進程,線程池的這一資源處於等待。當一個新任務須要運行時,若是線程池中有等待的工做線程,就能夠開始運行了;不然進入等待隊列。

線程池的分類

newCachedThreadPool

建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程

public static ExecutorService newCachedThreadPool() {    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                   60L, TimeUnit.SECONDS,                   new SynchronousQueue<Runnable>());  }public class ThreadPoolExecutorTest {    public static void main(String[] args) {      ExecutorService cachedThreadPool = Executors.newCachedThreadPool();       for (int i = 0; i < 10; i++) {          final int index = i;          try {          Thread.sleep(index * 1000);          } catch (InterruptedException e) {          e.printStackTrace();          }          cachedThreadPool.execute(new Runnable() {          public void run() {             System.out.println(index);          }        });       }     }  }  

線程池爲無限大,當執行第二個任務時第一個任務已經完成,會複用執行第一個任務的線程,而不用每次新建線程

newFixedThreadPool

建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。

public static ExecutorService newFixedThreadPool(int nThreads) {    return new ThreadPoolExecutor(nThreads, nThreads,                   0L, TimeUnit.MILLISECONDS,                   new LinkedBlockingQueue<Runnable>());}public class ThreadPoolExecutorTest {    public static void main(String[] args) {      ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);      for (int i = 0; i < 10; i++) {        final int index = i;        fixedThreadPool.execute(new Runnable() {          public void run() {            try {              System.out.println(index);              Thread.sleep(2000);            } catch (InterruptedException e) {              e.printStackTrace();            }          }        });      }    }  } 

由於線程池大小爲3,每一個任務輸出index後sleep 2秒,因此每兩秒打印3個數字。定長線程池的大小最好根據系統資源進行設置。如Runtime.getRuntime().availableProcessors()

newScheduledThreadPool

建立一個定長線程池,支持定時及週期性任務執行。

public ScheduledThreadPoolExecutor(int corePoolSize) {    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,       new DelayedWorkQueue());  }public class ThreadPoolExecutorTest {    public static void main(String[] args) {      ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);      scheduledThreadPool.schedule(new Runnable() {        public void run() {          System.out.println("delay 3 seconds");        }      }, 3, TimeUnit.SECONDS);    }  }  

表示延遲3秒執行

public class ThreadPoolExecutorTest {    public static void main(String[] args) {      ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);      scheduledThreadPool.scheduleAtFixedRate(new Runnable() {        public void run() {          System.out.println("delay 1 seconds, and excute every 3 seconds");        }      }, 1, 3, TimeUnit.SECONDS);    }  }  

表示延遲1秒後每3秒執行一次

newSingleThreadExecutor

建立一個單線程化的線程池,它只會用惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行。

public static ExecutorService newSingleThreadExecutor() {    return new FinalizableDelegatedExecutorService      (new ThreadPoolExecutor(1, 1,                  0L, TimeUnit.MILLISECONDS,                  new LinkedBlockingQueue<Runnable>()));  }

阿里發佈的 Java開發手冊中強制線程池不容許使用 Executors 去建立,而是經過 ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同窗更加明確線程池的運行規則,規避資源耗盡的風險

ThreadPoolExecutor  public ThreadPoolExecutor(int corePoolSize,               int maximumPoolSize,               long keepAliveTime,               TimeUnit unit,               BlockingQueue<Runnable> workQueue,               ThreadFactory threadFactory,               RejectedExecutionHandler handler) 

corePoolSize - 線程池核心池的大小。

maximumPoolSize - 線程池的最大線程數。

keepAliveTime - 當線程數大於核心時,此爲終止前多餘的空閒線程等待新任務的最長時間。

unit - keepAliveTime 的時間單位。

workQueue - 用來儲存等待執行任務的隊列。

threadFactory - 線程工廠。

handler - 拒絕策略。

線程優先級:

corePoolSize > workQueue > maximumPoolSize>handler(拒絕)

拒絕策略:

  1. CallerRunsPolicy :這個策略重試添加當前的任務,他會自動重複調用 execute() 方法,直到成功。
  2. AbortPolicy :對拒絕任務拋棄處理,而且拋出異常。(默認使用的)
  3. DiscardPolicy :對拒絕任務直接無聲拋棄,沒有異常信息。
  4. DiscardOldestPolicy :對拒絕任務不拋棄,而是拋棄隊列裏面等待最久的一個線程,而後把拒絕任務加到隊列。

合理配置線程池

要想合理的配置線程池,就必須首先分析任務特性,能夠從如下幾個角度來進行分析:

  • 任務的性質:CPU密集型任務,IO密集型任務和混合型任務。
  • 任務的優先級:高,中和低。
  • 任務的執行時間:長,中和短。
  • 任務的依賴性:是否依賴其餘系統資源,如數據庫鏈接。
  • 任務性質不一樣的任務能夠用不一樣規模的線程池分開處理。CPU密集型任務配置儘量少的線程數量,如配置Ncpu+1個線程的線程池。IO密集型任務則因爲須要等待IO操做,線程並非一直在執行任務,則配置儘量多的線程,如2*Ncpu。混合型的任務,若是能夠拆分,則將其拆分紅一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐率要高於串行執行的吞吐率,若是這兩個任務執行時間相差太大,則不必進行分解。咱們能夠經過Runtime.getRuntime().availableProcessors()方法得到當前設備的CPU個數。
  • 優先級不一樣的任務可使用優先級隊列PriorityBlockingQueue來處理。它可讓優先級高的任務先獲得執行,須要注意的是若是一直有優先級高的任務提交到隊列裏,那麼優先級低的任務可能永遠不能執行。
  • 執行時間不一樣的任務能夠交給不一樣規模的線程池來處理,或者也可使用優先級隊列,讓執行時間短的任務先執行。
  • 依賴數據庫鏈接池的任務,由於線程提交SQL後須要等待數據庫返回結果,若是等待的時間越長CPU空閒時間就越長,那麼線程數應該設置越大,這樣才能更好的利用CPU。

CPU密集型時,任務能夠少配置線程數,大概和機器的cpu核數至關,這樣可使得每一個線程都在執行任務 IO密集型時,大部分線程都阻塞,故須要多配置線程數,2*cpu核數 操做系統之名稱解釋: 某些進程花費了絕大多數時間在計算上,而其餘則在等待I/O上花費了大可能是時間,前者稱爲計算密集型(CPU密集型)computer-bound,後者稱爲I/O密集型,I/O-bound。

 

寫在最後:歡迎留言討論,加關注,持續更新!!!

相關文章
相關標籤/搜索