多線程是實現併發機制的一種有效手段。在 Java 中實現多線程有兩種手段,一種是繼承 Thread 類,另外一種就是實現 Runnable/Callable 接口。java
java.util.concurrent 包是專爲 Java併發編程
而設計的包。類圖以下:編程
1、同步數組
1.1 synchronized 關鍵字,用來給對象和方法或者代碼塊加鎖。緩存
同步方法 synchronized T methodName(){} 同步方法鎖定的是當前對象。當多線程經過同一個對象引用屢次調用當前同步方法時,需同步執行。靜態同步方法,鎖的是當前類型的類對象。
同步方法隻影響鎖定同一個鎖對象的同步方法。不影響其餘線程調用非同步方法,或調用其餘鎖資源的同步方法。
鎖可重入。 同一個線程,屢次調用同步代碼,鎖定同一個鎖對象,可重入。子類同步方法覆蓋父類同步方法。能夠指定調用父類的同步方法,至關於鎖的重入。
當同步方法中發生異常的時候,自動釋放鎖資源。不會影響其餘線程的執行。
同步代碼塊 T methodName(){ synchronized(object){} } 同步代碼塊在執行時,是鎖定 object 對象。當多個線程調用同一個方法時,鎖定對象不變的狀況下,需同步執行。 同步代碼塊的同步粒度更加細緻,效率更高。 T methodName(){ synchronized(this){} } 當鎖定對象爲 this 時,至關於同步方法。
同步代碼一旦加鎖後,那麼會有一個臨時的鎖引用執行鎖對象,和真實的引用無直接關聯。在鎖未釋放以前,修改鎖對象引用,不會影響同步代碼的執行。安全
Java 虛擬機中的同步(Synchronization)基於進入和退出管程(Monitor)對象實現。同步方法 並非由 monitor enter 和 monitor exit 指令來實現同步的,而是由方法調用指令讀取運行時常量池中方法的 ACC_SYNCHRONIZED 標誌來隱式實現的。在 Java 虛擬機(HotSpot)中,monitor 是由 ObjectMonitor 實現的。服務器
1 /** 2 * synchronized關鍵字 3 * 鎖對象。synchronized(this)和synchronized方法都是鎖當前對象。 4 */ 5 package concurrent.t01; 6 7 import java.util.concurrent.TimeUnit; 8 9 public class Test_01 { 10 private int count = 0; 11 private Object o = new Object(); 12 13 public void testSync1(){ 14 synchronized(o){ 15 System.out.println(Thread.currentThread().getName() 16 + " count = " + count++); 17 } 18 } 19 20 public void testSync2(){ 21 synchronized(this){ 22 System.out.println(Thread.currentThread().getName() 23 + " count = " + count++); 24 } 25 } 26 27 public synchronized void testSync3(){ 28 System.out.println(Thread.currentThread().getName() 29 + " count = " + count++); 30 try { 31 TimeUnit.SECONDS.sleep(3); 32 } catch (InterruptedException e) { 33 e.printStackTrace(); 34 } 35 } 36 37 public static void main(String[] args) { 38 final Test_01 t = new Test_01(); 39 new Thread(new Runnable() { 40 @Override 41 public void run() { 42 t.testSync3(); 43 } 44 }).start(); 45 new Thread(new Runnable() { 46 @Override 47 public void run() { 48 t.testSync3(); 49 } 50 }).start(); 51 } 52 53 }
1.2 volatile 關鍵字多線程
變量的線程可見性。在 CPU 計算過程當中,會將計算過程須要的數據加載到 CPU 計算緩存中,當 CPU 計算中斷時,有可能刷新緩存,從新讀取內存中的數據。在線程運行的過程當中,若是某變量被其餘線程修改,可能形成數據不一致的狀況,從而致使結果錯誤。而 volatile修飾的變量是線程可見的,當 JVM 解釋 volatile 修飾的變量時,會通知 CPU,在計算過程當中,每次使用變量參與計算時,都會檢查內存中的數據是否發生變化,而不是一直使用 CPU 緩存中的數據,能夠保證計算結果的正確。volatile 只是通知底層計算時,CPU 檢查內存數據,而不是讓一個變量在多個線程中同步。併發
1 /** 2 * volatile關鍵字 3 * volatile的可見性 4 * 通知OS操做系統底層,在CPU計算過程當中,都要檢查內存中數據的有效性。保證最新的內存數據被使用。 5 * 6 */ 7 package concurrent.t01; 8 9 import java.util.concurrent.TimeUnit; 10 11 public class Test_09 { 12 13 volatile boolean b = true; 14 15 void m(){ 16 System.out.println("start"); 17 while(b){} 18 System.out.println("end"); 19 } 20 21 public static void main(String[] args) { 22 final Test_09 t = new Test_09(); 23 new Thread(new Runnable() { 24 @Override 25 public void run() { 26 t.m(); 27 } 28 }).start(); 29 30 try { 31 TimeUnit.SECONDS.sleep(1); 32 } catch (InterruptedException e) { 33 // TODO Auto-generated catch block 34 e.printStackTrace(); 35 } 36 37 t.b = false; 38 } 39 40 }
1 /** 2 * volatile關鍵字 3 * volatile的非原子性問題 4 * volatile, 只能保證可見性,不能保證原子性。 5 * 不是枷鎖問題,只是內存數據可見。 6 */ 7 package concurrent.t01; 8 9 import java.util.ArrayList; 10 import java.util.List; 11 12 public class Test_10 { 13 14 volatile int count = 0; 15 /*synchronized*/ void m(){ 16 for(int i = 0; i < 10000; i++){ 17 count++; 18 } 19 } 20 21 public static void main(String[] args) { 22 final Test_10 t = new Test_10(); 23 List<Thread> threads = new ArrayList<>(); 24 for(int i = 0; i < 10; i++){ 25 threads.add(new Thread(new Runnable() { 26 @Override 27 public void run() { 28 t.m(); 29 } 30 })); 31 } 32 for(Thread thread : threads){ 33 thread.start(); 34 } 35 for(Thread thread : threads){ 36 try { 37 thread.join(); 38 } catch (InterruptedException e) { 39 // TODO Auto-generated catch block 40 e.printStackTrace(); 41 } 42 } 43 System.out.println(t.count); 44 } 45 }
1.3 wait¬ifydom
當線程執行wait()方法時候,會釋放當前的鎖,而後讓出CPU,進入等待狀態。只有當 notify/notifyAll() 被執行時候,纔會喚醒一個或多個正處於等待狀態的線程,而後繼續往下執行,直到執行完synchronized 代碼塊的代碼或是中途遇到wait() ,再次釋放鎖。ide
因爲 wait()、notify/notifyAll() 在synchronized 代碼塊執行,說明當前線程必定是獲取了鎖的。wait()、notify/notifyAll() 方法是Object的本地final方法,沒法被重寫。
1 /** 2 * 生產者消費者 3 * wait¬ify 4 * wait/notify都是和while配合應用的。能夠避免多線程併發判斷邏輯失效問題。各位想一想爲何不能用if 5 */ 6 package concurrent.t04; 7 8 import java.util.LinkedList; 9 import java.util.concurrent.TimeUnit; 10 11 public class TestContainer01<E> { 12 13 private final LinkedList<E> list = new LinkedList<>(); 14 private final int MAX = 10; 15 private int count = 0; 16 17 public synchronized int getCount(){ 18 return count; 19 } 20 21 public synchronized void put(E e){ 22 while(list.size() == MAX){ 23 try { 24 this.wait(); 25 } catch (InterruptedException e1) { 26 e1.printStackTrace(); 27 } 28 } 29 30 list.add(e); 31 count++; 32 this.notifyAll(); 33 } 34 35 public synchronized E get(){ 36 E e = null; 37 while(list.size() == 0){ 38 try{ 39 this.wait(); 40 } catch (InterruptedException e1) { 41 e1.printStackTrace(); 42 } 43 } 44 e = list.removeFirst(); 45 count--; 46 this.notifyAll(); 47 return e; 48 } 49 50 public static void main(String[] args) { 51 final TestContainer01<String> c = new TestContainer01<>(); 52 for(int i = 0; i < 10; i++){ 53 new Thread(new Runnable() { 54 @Override 55 public void run() { 56 for(int j = 0; j < 5; j++){ 57 System.out.println(c.get()); 58 } 59 } 60 }, "consumer"+i).start(); 61 } 62 try { 63 TimeUnit.SECONDS.sleep(2); 64 } catch (InterruptedException e) { 65 // TODO Auto-generated catch block 66 e.printStackTrace(); 67 } 68 for(int i = 0; i < 2; i++){ 69 new Thread(new Runnable() { 70 @Override 71 public void run() { 72 for(int j = 0; j < 25; j++){ 73 c.put("container value " + j); 74 } 75 } 76 }, "producer"+i).start(); 77 } 78 } 79 80 }
1.4 AtomicXxx 類型
原子類型。
在 concurrent.atomic 包中定義了若干原子類型,這些類型中的每一個方法都是保證了原子操做的。多線程併發訪問原子類型對象中的方法,不會出現數據錯誤。在多線程開發中,若是某數據須要多個線程同時操做,且要求計算原子性,能夠考慮使用原子類型對象。
注意:原子類型中的方法 是保證了原子操做,但多個方法之間是沒有原子性的。即訪問對2個或2個以上的atomic變量(或者對單個atomic變量進行2次或2次以上的操做),仍是須要同步。
1 /** 2 * AtomicXxx 3 * 同步類型 4 * 原子操做類型。 其中的每一個方法都是原子操做。能夠保證線程安全。 5 */ 6 package concurrent.t01; 7 8 import java.util.ArrayList; 9 import java.util.List; 10 import java.util.concurrent.atomic.AtomicInteger; 11 12 public class Test_11 { 13 AtomicInteger count = new AtomicInteger(0); 14 void m(){ 15 for(int i = 0; i < 10000; i++){ 16 /*if(count.get() < 1000)*/ 17 count.incrementAndGet(); 18 } 19 } 20 21 public static void main(String[] args) { 22 final Test_11 t = new Test_11(); 23 List<Thread> threads = new ArrayList<>(); 24 for(int i = 0; i < 10; i++){ 25 threads.add(new Thread(new Runnable() { 26 @Override 27 public void run() { 28 t.m(); 29 } 30 })); 31 } 32 for(Thread thread : threads){ 33 thread.start(); 34 } 35 for(Thread thread : threads){ 36 try { 37 thread.join(); 38 } catch (InterruptedException e) { 39 // TODO Auto-generated catch block 40 e.printStackTrace(); 41 } 42 } 43 System.out.println(t.count.intValue()); 44 } 45 }
1.5 CountDownLatch 門閂
門閂是 concurrent 包中定義的一個類型,是用於多線程通信的一個輔助類型。門閂至關於在一個門上加多個鎖,當線程調用 await 方法時,會檢查門閂數量,若是門閂數量大於 0,線程會阻塞等待。當線程調用 countDown 時,會遞減門閂的數量,當門閂數量爲 0 時,await 阻塞線程可執行。
1 /** 2 * 門閂 - CountDownLatch 3 * 能夠和鎖混合使用,或替代鎖的功能。 4 * 在門閂未徹底開放以前等待。當門閂徹底開放後執行。 5 * 避免鎖的效率低下問題。 6 */ 7 package concurrent.t01; 8 9 import java.util.concurrent.CountDownLatch; 10 import java.util.concurrent.TimeUnit; 11 12 public class Test_15 { 13 CountDownLatch latch = new CountDownLatch(5); 14 15 void m1(){ 16 try { 17 latch.await();// 等待門閂開放。 18 } catch (InterruptedException e) { 19 e.printStackTrace(); 20 } 21 System.out.println("m1() method"); 22 } 23 24 void m2(){ 25 for(int i = 0; i < 10; i++){ 26 if(latch.getCount() != 0){ 27 System.out.println("latch count : " + latch.getCount()); 28 latch.countDown(); // 減門閂上的鎖。 29 } 30 try { 31 TimeUnit.MILLISECONDS.sleep(500); 32 } catch (InterruptedException e) { 33 // TODO Auto-generated catch block 34 e.printStackTrace(); 35 } 36 System.out.println("m2() method : " + i); 37 } 38 } 39 40 public static void main(String[] args) { 41 final Test_15 t = new Test_15(); 42 new Thread(new Runnable() { 43 @Override 44 public void run() { 45 t.m1(); 46 } 47 }).start(); 48 49 new Thread(new Runnable() { 50 @Override 51 public void run() { 52 t.m2(); 53 } 54 }).start(); 55 } 56 57 }
1.6 鎖的重入
在 Java 中,同步鎖是能夠重入的。只有同一線程調用同步方法或執行同步代碼塊,對同一個對象加鎖時纔可重入。
當線程持有鎖時,會在 monitor 的計數器中執行遞增計算,若當前線程調用其餘同步代碼,且同步代碼的鎖對象相同時,monitor 中的計數器繼續遞增。每一個同步代碼執行結束,monitor 中的計數器都會遞減,直至全部同步代碼執行結束,monitor 中的計數器爲 0 時,釋放鎖標記,_Owner 標記賦值爲 null。
1 /** 2 * 鎖可重入。 同一個線程,屢次調用同步代碼,鎖定同一個鎖對象,可重入。 3 */ 4 package concurrent.t01; 5 6 import java.util.concurrent.TimeUnit; 7 8 public class Test_06 { 9 10 synchronized void m1(){ // 鎖this 11 System.out.println("m1 start"); 12 try { 13 TimeUnit.SECONDS.sleep(2); 14 } catch (InterruptedException e) { 15 e.printStackTrace(); 16 } 17 m2(); 18 System.out.println("m1 end"); 19 } 20 synchronized void m2(){ // 鎖this 21 System.out.println("m2 start"); 22 try { 23 TimeUnit.SECONDS.sleep(1); 24 } catch (InterruptedException e) { 25 e.printStackTrace(); 26 } 27 System.out.println("m2 end"); 28 } 29 30 public static void main(String[] args) { 31 32 new Test_06().m1(); 33 34 } 35 36 }
1.7 ReentrantLock
重入鎖,建議應用的同步方式。相對效率比 synchronized 高。量級較輕。使用重入鎖, 必須手工釋放鎖標記。通常都是在 finally 代碼塊中定義釋放鎖標記的 unlock 方法。
1 /** 2 * ReentrantLock 3 * 重入鎖 4 */ 5 package concurrent.t03; 6 7 import java.util.concurrent.TimeUnit; 8 import java.util.concurrent.locks.Lock; 9 import java.util.concurrent.locks.ReentrantLock; 10 11 public class Test_01 { 12 Lock lock = new ReentrantLock(); 13 14 void m1(){ 15 try{ 16 lock.lock(); // 加鎖 17 for(int i = 0; i < 10; i++){ 18 TimeUnit.SECONDS.sleep(1); 19 System.out.println("m1() method " + i); 20 } 21 }catch(InterruptedException e){ 22 e.printStackTrace(); 23 }finally{ 24 lock.unlock(); // 解鎖 25 } 26 } 27 28 void m2(){ 29 lock.lock(); 30 System.out.println("m2() method"); 31 lock.unlock(); 32 } 33 34 public static void main(String[] args) { 35 final Test_01 t = new Test_01(); 36 new Thread(new Runnable() { 37 @Override 38 public void run() { 39 t.m1(); 40 } 41 }).start(); 42 try { 43 TimeUnit.SECONDS.sleep(1); 44 } catch (InterruptedException e) { 45 e.printStackTrace(); 46 } 47 new Thread(new Runnable() { 48 @Override 49 public void run() { 50 t.m2(); 51 } 52 }).start(); 53 } 54 }
1 /** 2 * 嘗試鎖 3 */ 4 package concurrent.t03; 5 6 import java.util.concurrent.TimeUnit; 7 import java.util.concurrent.locks.Lock; 8 import java.util.concurrent.locks.ReentrantLock; 9 10 public class Test_02 { 11 Lock lock = new ReentrantLock(); 12 13 void m1(){ 14 try{ 15 lock.lock(); 16 for(int i = 0; i < 10; i++){ 17 TimeUnit.SECONDS.sleep(1); 18 System.out.println("m1() method " + i); 19 } 20 }catch(InterruptedException e){ 21 e.printStackTrace(); 22 }finally{ 23 lock.unlock(); 24 } 25 } 26 27 void m2(){ 28 boolean isLocked = false; 29 try{ 30 // 嘗試鎖, 若是有鎖,沒法獲取鎖標記,返回false。 31 // 若是獲取鎖標記,返回true 32 // isLocked = lock.tryLock(); 33 34 // 阻塞嘗試鎖,阻塞參數表明的時長,嘗試獲取鎖標記。 35 // 若是超時,不等待。直接返回。 36 isLocked = lock.tryLock(5, TimeUnit.SECONDS); 37 38 if(isLocked){ 39 System.out.println("m2() method synchronized"); 40 }else{ 41 System.out.println("m2() method unsynchronized"); 42 } 43 }catch(Exception e){ 44 e.printStackTrace(); 45 }finally{ 46 if(isLocked){ 47 // 嘗試鎖在解除鎖標記的時候,必定要判斷是否獲取到鎖標記。 48 // 若是當前線程沒有獲取到鎖標記,會拋出異常。 49 lock.unlock(); 50 } 51 } 52 } 53 54 public static void main(String[] args) { 55 final Test_02 t = new Test_02(); 56 new Thread(new Runnable() { 57 @Override 58 public void run() { 59 t.m1(); 60 } 61 }).start(); 62 try { 63 TimeUnit.SECONDS.sleep(1); 64 } catch (InterruptedException e) { 65 // TODO Auto-generated catch block 66 e.printStackTrace(); 67 } 68 new Thread(new Runnable() { 69 @Override 70 public void run() { 71 t.m2(); 72 } 73 }).start(); 74 } 75 }
1 /** 2 * 可打斷 3 * 4 * 阻塞狀態: 包括普通阻塞,等待隊列,鎖池隊列。 5 * 普通阻塞: sleep(10000), 能夠被打斷。調用thread.interrupt()方法,能夠打斷阻塞狀態,拋出異常。 6 * 等待隊列: wait()方法被調用,也是一種阻塞狀態,只能由notify喚醒。沒法打斷 7 * 鎖池隊列: 沒法獲取鎖標記。不是全部的鎖池隊列均可被打斷。 8 * 使用ReentrantLock的lock方法,獲取鎖標記的時候,若是須要阻塞等待鎖標記,沒法被打斷。 9 * 使用ReentrantLock的lockInterruptibly方法,獲取鎖標記的時候,若是須要阻塞等待,能夠被打斷。 10 * 11 */ 12 package concurrent.t03; 13 14 import java.util.concurrent.TimeUnit; 15 import java.util.concurrent.locks.Lock; 16 import java.util.concurrent.locks.ReentrantLock; 17 18 public class Test_03 { 19 Lock lock = new ReentrantLock(); 20 21 void m1(){ 22 try{ 23 lock.lock(); 24 for(int i = 0; i < 5; i++){ 25 TimeUnit.SECONDS.sleep(1); 26 System.out.println("m1() method " + i); 27 } 28 }catch(InterruptedException e){ 29 e.printStackTrace(); 30 }finally{ 31 lock.unlock(); 32 } 33 } 34 35 void m2(){ 36 try{ 37 lock.lockInterruptibly(); // 可嘗試打斷,阻塞等待鎖。能夠被其餘的線程打斷阻塞狀態 38 System.out.println("m2() method"); 39 }catch(InterruptedException e){ 40 System.out.println("m2() method interrupted"); 41 }finally{ 42 try{ 43 lock.unlock(); 44 }catch(Exception e){ 45 e.printStackTrace(); 46 } 47 } 48 } 49 50 public static void main(String[] args) { 51 final Test_03 t = new Test_03(); 52 new Thread(new Runnable() { 53 @Override 54 public void run() { 55 t.m1(); 56 } 57 }).start(); 58 try { 59 TimeUnit.SECONDS.sleep(1); 60 } catch (InterruptedException e) { 61 // TODO Auto-generated catch block 62 e.printStackTrace(); 63 } 64 Thread t2 = new Thread(new Runnable() { 65 @Override 66 public void run() { 67 t.m2(); 68 } 69 }); 70 t2.start(); 71 try { 72 TimeUnit.SECONDS.sleep(1); 73 } catch (InterruptedException e) { 74 // TODO Auto-generated catch block 75 e.printStackTrace(); 76 } 77 t2.interrupt();// 打斷線程休眠。非正常結束阻塞狀態的線程,都會拋出異常。 78 } 79 }
1 /** 2 * 公平鎖 3 */ 4 package concurrent.t03; 5 6 import java.util.concurrent.locks.ReentrantLock; 7 8 public class Test_04 { 9 10 public static void main(String[] args) { 11 TestReentrantlock t = new TestReentrantlock(); 12 //TestSync t = new TestSync(); 13 Thread t1 = new Thread(t); 14 Thread t2 = new Thread(t); 15 t1.start(); 16 t2.start(); 17 } 18 } 19 20 class TestReentrantlock extends Thread{ 21 // 定義一個公平鎖 22 private static ReentrantLock lock = new ReentrantLock(true); 23 public void run(){ 24 for(int i = 0; i < 5; i++){ 25 lock.lock(); 26 try{ 27 System.out.println(Thread.currentThread().getName() + " get lock"); 28 }finally{ 29 lock.unlock(); 30 } 31 } 32 } 33 34 } 35 36 class TestSync extends Thread{ 37 public void run(){ 38 for(int i = 0; i < 5; i++){ 39 synchronized (this) { 40 System.out.println(Thread.currentThread().getName() + " get lock in TestSync"); 41 } 42 } 43 } 44 }
1 /** 2 * 生產者消費者 3 * 重入鎖&條件 4 * 條件 - Condition, 爲Lock增長條件。當條件知足時,作什麼事情,如加鎖或解鎖。如等待或喚醒 5 */ 6 package concurrent.t04; 7 8 import java.io.IOException; 9 import java.util.LinkedList; 10 import java.util.concurrent.TimeUnit; 11 import java.util.concurrent.locks.Condition; 12 import java.util.concurrent.locks.Lock; 13 import java.util.concurrent.locks.ReentrantLock; 14 15 public class TestContainer02<E> { 16 17 private final LinkedList<E> list = new LinkedList<>(); 18 private final int MAX = 10; 19 private int count = 0; 20 21 private Lock lock = new ReentrantLock(); 22 private Condition producer = lock.newCondition(); 23 private Condition consumer = lock.newCondition(); 24 25 public int getCount(){ 26 return count; 27 } 28 29 public void put(E e){ 30 lock.lock(); 31 try { 32 while(list.size() == MAX){ 33 System.out.println(Thread.currentThread().getName() + " 等待。。。"); 34 // 進入等待隊列。釋放鎖標記。 35 // 藉助條件,進入的等待隊列。 36 producer.await(); 37 } 38 System.out.println(Thread.currentThread().getName() + " put 。。。"); 39 list.add(e); 40 count++; 41 // 藉助條件,喚醒全部的消費者。 42 consumer.signalAll(); 43 } catch (InterruptedException e1) { 44 e1.printStackTrace(); 45 } finally { 46 lock.unlock(); 47 } 48 } 49 50 public E get(){ 51 E e = null; 52 53 lock.lock(); 54 try { 55 while(list.size() == 0){ 56 System.out.println(Thread.currentThread().getName() + " 等待。。。"); 57 // 藉助條件,消費者進入等待隊列 58 consumer.await(); 59 } 60 System.out.println(Thread.currentThread().getName() + " get 。。。"); 61 e = list.removeFirst(); 62 count--; 63 // 藉助條件,喚醒全部的生產者 64 producer.signalAll(); 65 } catch (InterruptedException e1) { 66 e1.printStackTrace(); 67 } finally { 68 lock.unlock(); 69 } 70 71 return e; 72 } 73 74 public static void main(String[] args) { 75 final TestContainer02<String> c = new TestContainer02<>(); 76 for(int i = 0; i < 10; i++){ 77 new Thread(new Runnable() { 78 @Override 79 public void run() { 80 for(int j = 0; j < 5; j++){ 81 System.out.println(c.get()); 82 } 83 } 84 }, "consumer"+i).start(); 85 } 86 try { 87 TimeUnit.SECONDS.sleep(2); 88 } catch (InterruptedException e1) { 89 e1.printStackTrace(); 90 } 91 for(int i = 0; i < 2; i++){ 92 new Thread(new Runnable() { 93 @Override 94 public void run() { 95 for(int j = 0; j < 25; j++){ 96 c.put("container value " + j); 97 } 98 } 99 }, "producer"+i).start(); 100 } 101 } 102 103 }
1.8 ThreadLocal
ThreadLocal 提供了線程本地的實例。它與普通變量的區別在於,每一個使用該變量的線程都會初始化一個徹底獨立的實例副本。ThreadLocal 變量一般被private static
修飾。當一個線程結束時,它所使用的全部 ThreadLocal 相對的實例副本均可被回收。
ThreadLocal 適用於每一個線程須要本身獨立的實例且該實例須要在多個方法中被使用,也即變量在線程間隔離而在方法或類間共享的場景。每一個 Thread 有本身的實例副本,且其它 Thread 不可訪問,那就不存在多線程間共享的問題。
1 /** 2 * ThreadLocal 3 * 就是一個Map。key - 》 Thread.getCurrentThread(). value - 》 線程須要保存的變量。 4 * ThreadLocal.set(value) -> map.put(Thread.getCurrentThread(), value); 5 * ThreadLocal.get() -> map.get(Thread.getCurrentThread()); 6 * 內存問題 : 在併發量高的時候,可能有內存溢出。 7 * 使用ThreadLocal的時候,必定注意回收資源問題,每一個線程結束以前,將當前線程保存的線程變量必定要刪除 。 8 * ThreadLocal.remove(); 9 */ 10 package concurrent.t05; 11 12 import java.util.concurrent.TimeUnit; 13 14 public class Test_01 { 15 16 volatile static String name = "zhangsan"; 17 static ThreadLocal<String> tl = new ThreadLocal<>(); 18 19 public static void main(String[] args) { 20 new Thread(new Runnable() { 21 @Override 22 public void run() { 23 try { 24 TimeUnit.SECONDS.sleep(2); 25 } catch (InterruptedException e) { 26 e.printStackTrace(); 27 } 28 System.out.println(name); 29 System.out.println(tl.get()); 30 } 31 }).start(); 32 33 new Thread(new Runnable() { 34 @Override 35 public void run() { 36 try { 37 TimeUnit.SECONDS.sleep(1); 38 } catch (InterruptedException e) { 39 e.printStackTrace(); 40 } 41 name = "lisi"; 42 tl.set("wangwu"); 43 } 44 }).start(); 45 } 46 47 }
若是調用 ThreadLocal 的 set 方法將一個對象放入Thread中的成員變量threadLocals 中,那麼這個對象是永遠不會被回收的,由於這個對象永遠都被Thread中的成員變量threadLocals引用着,可能會形成 OutOfMemoryError。須要調用 ThreadLocal 的 remove 方法 將對象從thread中的成員變量threadLocals中刪除掉。
2、同步容器
線程安全的容器對象: Vector, Hashtable。線程安全容器對象,都是使用 synchronized方法實現的。
concurrent 包中的同步容器,大多數是使用系統底層技術實現的線程安全。相似 native。Java8 中使用 CAS。
2.1 Map/Set
底層哈希實現的同步 Map(Set)。效率高,線程安全。使用系統底層技術實現線程安全。量級較 synchronized 低。key 和 value 不能爲 null。
底層跳錶(SkipList)實現的同步 Map(Set)。有序,效率比 ConcurrentHashMap 稍低。
1 /** 2 * 併發容器 - ConcurrentMap 3 */ 4 package concurrent.t06; 5 6 import java.util.HashMap; 7 import java.util.Hashtable; 8 import java.util.Map; 9 import java.util.Random; 10 import java.util.concurrent.ConcurrentHashMap; 11 import java.util.concurrent.ConcurrentSkipListMap; 12 import java.util.concurrent.CountDownLatch; 13 14 public class Test_01_ConcurrentMap { 15 16 public static void main(String[] args) { 17 final Map<String, String> map = new Hashtable<>(); 18 // final Map<String, String> map = new ConcurrentHashMap<>(); 19 // final Map<String, String> map = new ConcurrentSkipListMap<>(); 20 final Random r = new Random(); 21 Thread[] array = new Thread[100]; 22 final CountDownLatch latch = new CountDownLatch(array.length); 23 24 long begin = System.currentTimeMillis(); 25 for(int i = 0; i < array.length; i++){ 26 array[i] = new Thread(new Runnable() { 27 @Override 28 public void run() { 29 for(int j = 0; j < 10000; j++){ 30 map.put("key"+r.nextInt(100000), "value"+r.nextInt(100000)); 31 } 32 latch.countDown(); 33 } 34 }); 35 } 36 for(Thread t : array){ 37 t.start(); 38 } 39 try { 40 latch.await(); 41 } catch (InterruptedException e) { 42 e.printStackTrace(); 43 } 44 long end = System.currentTimeMillis(); 45 System.out.println("執行時間爲 : " + (end-begin) + "毫秒!"); 46 } 47 48 }
2.2 List
1 /** 2 * 併發容器 - CopyOnWriteList 3 * 寫時複製集合。寫入效率低,讀取效率高。每次寫入數據,都會建立一個新的底層數組。 4 */ 5 package concurrent.t06; 6 7 import java.util.ArrayList; 8 import java.util.List; 9 import java.util.Random; 10 import java.util.Vector; 11 import java.util.concurrent.CopyOnWriteArrayList; 12 import java.util.concurrent.CountDownLatch; 13 14 public class Test_02_CopyOnWriteList { 15 16 public static void main(String[] args) { 17 // final List<String> list = new ArrayList<>(); 18 // final List<String> list = new Vector<>(); 19 final List<String> list = new CopyOnWriteArrayList<>(); 20 final Random r = new Random(); 21 Thread[] array = new Thread[100]; 22 final CountDownLatch latch = new CountDownLatch(array.length); 23 24 long begin = System.currentTimeMillis(); 25 for(int i = 0; i < array.length; i++){ 26 array[i] = new Thread(new Runnable() { 27 @Override 28 public void run() { 29 for(int j = 0; j < 1000; j++){ 30 list.add("value" + r.nextInt(100000)); 31 } 32 latch.countDown(); 33 } 34 }); 35 } 36 for(Thread t : array){ 37 t.start(); 38 } 39 try { 40 latch.await(); 41 } catch (InterruptedException e) { 42 e.printStackTrace(); 43 } 44 long end = System.currentTimeMillis(); 45 System.out.println("執行時間爲 : " + (end-begin) + "毫秒!"); 46 System.out.println("List.size() : " + list.size()); 47 } 48 49 }
2.3 Queue
1 /** 2 * 併發容器 - ConcurrentLinkedQueue 3 * 隊列 - 鏈表實現的。 4 */ 5 package concurrent.t06; 6 7 import java.util.Queue; 8 import java.util.concurrent.ConcurrentLinkedQueue; 9 10 public class Test_03_ConcurrentLinkedQueue { 11 12 public static void main(String[] args) { 13 Queue<String> queue = new ConcurrentLinkedQueue<>(); 14 for(int i = 0; i < 10; i++){ 15 queue.offer("value" + i); 16 } 17 18 System.out.println(queue); 19 System.out.println(queue.size()); 20 21 // peek() -> 查看queue中的首數據 22 System.out.println(queue.peek()); 23 System.out.println(queue.size()); 24 25 // poll() -> 獲取queue中的首數據 26 System.out.println(queue.poll()); 27 System.out.println(queue.size()); 28 } 29 30 }
1 /** 2 * 併發容器 - LinkedBlockingQueue 3 * 阻塞容器。 4 * put & take - 自動阻塞。 5 * put自動阻塞, 隊列容量滿後,自動阻塞 6 * take自動阻塞方法, 隊列容量爲0後,自動阻塞。 7 */ 8 package concurrent.t06; 9 10 import java.util.Random; 11 import java.util.concurrent.BlockingQueue; 12 import java.util.concurrent.LinkedBlockingQueue; 13 import java.util.concurrent.TimeUnit; 14 15 public class Test_04_LinkedBlockingQueue { 16 17 final BlockingQueue<String> queue = new LinkedBlockingQueue<>(); 18 final Random r = new Random(); 19 20 public static void main(String[] args) { 21 final Test_04_LinkedBlockingQueue t = new Test_04_LinkedBlockingQueue(); 22 23 new Thread(new Runnable() { 24 @Override 25 public void run() { 26 while(true){ 27 try { 28 t.queue.put("value"+t.r.nextInt(1000)); 29 TimeUnit.SECONDS.sleep(1); 30 } catch (InterruptedException e) { 31 e.printStackTrace(); 32 } 33 } 34 } 35 }, "producer").start(); 36 37 for(int i = 0; i < 3; i++){ 38 new Thread(new Runnable() { 39 @Override 40 public void run() { 41 while(true){ 42 try { 43 System.out.println(Thread.currentThread().getName() + 44 " - " + t.queue.take()); 45 } catch (InterruptedException e) { 46 e.printStackTrace(); 47 } 48 } 49 } 50 }, "consumer"+i).start(); 51 } 52 } 53 54 }
1 /** 2 * 併發容器 - ArrayBlockingQueue 3 * 有界容器。 4 * 當容量不足的時候,有阻塞能力。 5 *add 方法在容量不足的時候,拋出異常。 6 *put 方法在容量不足的時候,阻塞等待。 7 *offer 方法, 8 *單參數 offer 方法,不阻塞。容量不足的時候,返回 false。當前新增數據操做放棄。 9 *三參數 offer 方法(offer(value,times,timeunit)),容量不足的時候,阻塞 times 時長(單 10 *位爲 timeunit),若是在阻塞時長內,有容量空閒,新增數據返回 true。若是阻塞時長範圍 11 *內,無容量空閒,放棄新增數據,返回 false。 12 */ 13 package concurrent.t06; 14 15 import java.util.concurrent.ArrayBlockingQueue; 16 import java.util.concurrent.BlockingQueue; 17 import java.util.concurrent.TimeUnit; 18 19 public class Test_05_ArrayBlockingQueue { 20 21 final BlockingQueue<String> queue = new ArrayBlockingQueue<>(3); 22 23 public static void main(String[] args) { 24 final Test_05_ArrayBlockingQueue t = new Test_05_ArrayBlockingQueue(); 25 26 for(int i = 0; i < 5; i++){ 27 // System.out.println("add method : " + t.queue.add("value"+i)); 28 /*try { 29 t.queue.put("put"+i); 30 } catch (InterruptedException e) { 31 e.printStackTrace(); 32 } 33 System.out.println("put method : " + i);*/ 34 // System.out.println("offer method : " + t.queue.offer("value"+i)); 35 try { 36 System.out.println("offer method : " + 37 t.queue.offer("value"+i, 1, TimeUnit.SECONDS)); 38 } catch (InterruptedException e) { 39 e.printStackTrace(); 40 } 41 } 42 43 System.out.println(t.queue); 44 } 45 46 }
1 /** 2 * 併發容器 - DelayQueue 3 * 無界容器。 4 */ 5 package concurrent.t06; 6 7 import java.util.concurrent.BlockingQueue; 8 import java.util.concurrent.DelayQueue; 9 import java.util.concurrent.Delayed; 10 import java.util.concurrent.TimeUnit; 11 12 public class Test_06_DelayQueue { 13 14 static BlockingQueue<MyTask_06> queue = new DelayQueue<>(); 15 16 public static void main(String[] args) throws InterruptedException { 17 long value = System.currentTimeMillis(); 18 MyTask_06 task1 = new MyTask_06(value + 2000); 19 MyTask_06 task2 = new MyTask_06(value + 1000); 20 MyTask_06 task3 = new MyTask_06(value + 3000); 21 MyTask_06 task4 = new MyTask_06(value + 2500); 22 MyTask_06 task5 = new MyTask_06(value + 1500); 23 24 queue.put(task1); 25 queue.put(task2); 26 queue.put(task3); 27 queue.put(task4); 28 queue.put(task5); 29 30 System.out.println(queue); 31 System.out.println(value); 32 for(int i = 0; i < 5; i++){ 33 System.out.println(queue.take()); 34 } 35 } 36 37 } 38 39 class MyTask_06 implements Delayed { 40 41 private long compareValue; 42 43 public MyTask_06(long compareValue){ 44 this.compareValue = compareValue; 45 } 46 47 /** 48 * 比較大小。自動實現升序 49 * 建議和getDelay方法配合完成。 50 * 若是在DelayQueue是須要按時間完成的計劃任務,必須配合getDelay方法完成。 51 */ 52 @Override 53 public int compareTo(Delayed o) { 54 return (int)(this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); 55 } 56 57 /** 58 * 獲取計劃時長的方法。 59 * 根據參數TimeUnit來決定,如何返回結果值。 60 */ 61 @Override 62 public long getDelay(TimeUnit unit) { 63 return unit.convert(compareValue - System.currentTimeMillis(), TimeUnit.MILLISECONDS); 64 } 65 66 @Override 67 public String toString(){ 68 return "Task compare value is : " + this.compareValue; 69 } 70 71 }
1 /** 2 * 併發容器 - LinkedTransferQueue 3 * 轉移隊列 4 * add - 隊列會保存數據,不作阻塞等待。 5 * transfer - 是TransferQueue的特有方法。必須有消費者(take()方法的調用者)。 6 * 若是沒有任意線程消費數據,transfer方法阻塞。通常用於處理即時消息。 7 */ 8 package concurrent.t06; 9 10 import java.util.concurrent.LinkedTransferQueue; 11 import java.util.concurrent.TimeUnit; 12 import java.util.concurrent.TransferQueue; 13 14 public class Test_07_TransferQueue { 15 16 TransferQueue<String> queue = new LinkedTransferQueue<>(); 17 18 public static void main(String[] args) { 19 final Test_07_TransferQueue t = new Test_07_TransferQueue(); 20 21 /*new Thread(new Runnable() { 22 @Override 23 public void run() { 24 try { 25 System.out.println(Thread.currentThread().getName() + " thread begin " ); 26 System.out.println(Thread.currentThread().getName() + " - " + t.queue.take()); 27 } catch (InterruptedException e) { 28 e.printStackTrace(); 29 } 30 } 31 }, "output thread").start(); 32 33 try { 34 TimeUnit.SECONDS.sleep(2); 35 } catch (InterruptedException e) { 36 e.printStackTrace(); 37 } 38 39 try { 40 t.queue.transfer("test string"); 41 } catch (InterruptedException e) { 42 e.printStackTrace(); 43 }*/ 44 45 new Thread(new Runnable() { 46 47 @Override 48 public void run() { 49 try { 50 t.queue.transfer("test string"); 51 // t.queue.add("test string"); 52 System.out.println("add ok"); 53 } catch (Exception e) { 54 e.printStackTrace(); 55 } 56 } 57 }).start(); 58 59 try { 60 TimeUnit.SECONDS.sleep(2); 61 } catch (InterruptedException e) { 62 e.printStackTrace(); 63 } 64 65 new Thread(new Runnable() { 66 @Override 67 public void run() { 68 try { 69 System.out.println(Thread.currentThread().getName() + " thread begin " ); 70 System.out.println(Thread.currentThread().getName() + " - " + t.queue.take()); 71 } catch (InterruptedException e) { 72 e.printStackTrace(); 73 } 74 } 75 }, "output thread").start(); 76 77 } 78 79 }
1 /** 2 * 併發容器 - SynchronousQueue 3 * 必須現有消費線程等待,才能使用的隊列。 4 * add 方法,無阻塞。若沒有消費線程阻塞等待數據,則拋出異常。 5 * put 方法,有阻塞。若沒有消費線程阻塞等待數據,則阻塞。 6 */ 7 package concurrent.t06; 8 9 import java.util.concurrent.BlockingQueue; 10 import java.util.concurrent.SynchronousQueue; 11 import java.util.concurrent.TimeUnit; 12 13 public class Test_08_SynchronusQueue { 14 15 BlockingQueue<String> queue = new SynchronousQueue<>(); 16 17 public static void main(String[] args) { 18 final Test_08_SynchronusQueue t = new Test_08_SynchronusQueue(); 19 20 new Thread(new Runnable() { 21 @Override 22 public void run() { 23 try { 24 System.out.println(Thread.currentThread().getName() + " thread begin " ); 25 try { 26 TimeUnit.SECONDS.sleep(2); 27 } catch (InterruptedException e) { 28 e.printStackTrace(); 29 } 30 System.out.println(Thread.currentThread().getName() + " - " + t.queue.take()); 31 } catch (InterruptedException e) { 32 e.printStackTrace(); 33 } 34 } 35 }, "output thread").start(); 36 37 /*try { 38 TimeUnit.SECONDS.sleep(3); 39 } catch (InterruptedException e) { 40 e.printStackTrace(); 41 }*/ 42 // t.queue.add("test add"); 43 try { 44 t.queue.put("test put"); 45 } catch (InterruptedException e) { 46 e.printStackTrace(); 47 } 48 49 System.out.println(Thread.currentThread().getName() + " queue size : " + t.queue.size()); 50 } 51 52 }
3、 ThreadPool&Executor
3.1 Executor
線程池頂級接口。定義方法,void execute(Runnable)。方法是用於處理任務的一個服務方法。調用者提供 Runnable 接口的實現,線程池經過線程執行這個 Runnable。服務方法無返回值的。是 Runnable 接口中的 run 方法無返回值。
經常使用方法 - void execute(Runnable)
做用是: 啓動線程任務的。
1 /** 2 * 線程池 3 * Executor - 線程池底層處理機制。 4 * 在使用線程池的時候,底層如何調用線程中的邏輯。 5 */ 6 package concurrent.t08; 7 8 import java.util.concurrent.Executor; 9 10 public class Test_01_MyExecutor implements Executor { 11 public static void main(String[] args) { 12 new Test_01_MyExecutor().execute(new Runnable() { 13 @Override 14 public void run() { 15 System.out.println(Thread.currentThread().getName() + " - test executor"); 16 } 17 }); 18 } 19 20 @Override 21 public void execute(Runnable command) { 22 new Thread(command).start(); 23 } 24 }
3.2 ExecutorService
Executor 接口的子接口。提供了一個新的服務方法,submit。有返回值(Future 類型)。submit 方法提供了 overload 方法。其中有參數類型爲 Runnable 的,不須要提供返回值的;有參數類型爲 Callable,能夠提供線程執行後的返回值。
Future,是 submit 方法的返回值。表明將來,也就是線程執行結束後的一種結果。如返回值。
常見方法 - void execute(Runnable), Future submit(Callable), Future submit(Runnable)
線程池狀態: Running, ShuttingDown, Termitnaed
3.3 Future
將來結果,表明線程任務執行結束後的結果。獲取線程執行結果的方式是經過 get 方法獲取的。get 無參,阻塞等待線程執行結束,並獲得結果。get 有參,阻塞固定時長,等待
線程執行結束後的結果,若是在阻塞時長範圍內,線程未執行結束,拋出異常。
經常使用方法: T get() T get(long, TimeUnit)
1 /** 2 * 線程池 3 * 固定容量線程池 4 */ 5 package concurrent.t08; 6 7 import java.util.concurrent.Callable; 8 import java.util.concurrent.ExecutionException; 9 import java.util.concurrent.ExecutorService; 10 import java.util.concurrent.Executors; 11 import java.util.concurrent.Future; 12 import java.util.concurrent.FutureTask; 13 import java.util.concurrent.TimeUnit; 14 15 public class Test_03_Future { 16 17 public static void main(String[] args) throws InterruptedException, ExecutionException { 18 /*FutureTask<String> task = new FutureTask<>(new Callable<String>() { 19 @Override 20 public String call() throws Exception { 21 return "first future task"; 22 } 23 }); 24 25 new Thread(task).start(); 26 27 System.out.println(task.get());*/ 28 29 ExecutorService service = Executors.newFixedThreadPool(1); 30 31 Future<String> future = service.submit(new Callable<String>() { 32 @Override 33 public String call() { 34 try { 35 TimeUnit.MILLISECONDS.sleep(500); 36 } catch (InterruptedException e) { 37 e.printStackTrace(); 38 } 39 System.out.println("aaa"); 40 return Thread.currentThread().getName() + " - test executor"; 41 } 42 }); 43 System.out.println(future); 44 System.out.println(future.isDone()); // 查看線程是否結束, 任務是否完成。 call方法是否執行結束 45 46 System.out.println(future.get()); // 獲取call方法的返回值。 47 System.out.println(future.isDone()); 48 } 49 50 }
3.4 Callable
可執行接口。 相似 Runnable 接口。也是能夠啓動一個線程的接口。其中定義的方法是call。call 方法的做用和 Runnable 中的 run 方法徹底一致。call 方法有返回值。
接口方法 : Object call();至關於 Runnable 接口中的 run 方法。區別爲此方法有返回值。不能拋出已檢查異常。
和 Runnable 接口的選擇 - 須要返回值或須要拋出異常時,使用 Callable,其餘狀況可任意選擇。
3.5 Executors
工具類型。爲 Executor 線程池提供工具方法。能夠快速的提供若干種線程池。如:固定容量的,無限容量的,容量爲 1 等各類線程池。
線程池是一個進程級的重量級資源。默認的生命週期和 JVM 一致。當開啓線程池後,直到 JVM 關閉爲止,是線程池的默認生命週期。若是手工調用 shutdown 方法,那麼線程池執行全部的任務後,自動關閉。
開始 - 建立線程池。
結束 - JVM 關閉或調用 shutdown 並處理完全部的任務。
相似 Arrays,Collections 等工具類型的功用。
3.6 FixedThreadPool
容量固定的線程池。活動狀態和線程池容量是有上限的線程池。全部的線程池中,都有一個任務隊列。使用的是 BlockingQueue<Runnable>做爲任務的載體。當任務數量大於線程池容量的時候,沒有運行的任務保存在任務隊列中,當線程有空閒的,自動從隊列中取出任務執行。
使用場景: 大多數狀況下,使用的線程池,首選推薦 FixedThreadPool。OS 系統和硬件是有線程支持上限。不能隨意的無限制提供線程池。
線程池默認的容量上限是 Integer.MAX_VALUE。
常見的線程池容量: PC - 200。 服務器 - 1000~10000
queued tasks - 任務隊列
completed tasks - 結束任務隊列
1 /** 2 * 線程池 3 * 固定容量線程池 4 * FixedThreadPool - 固定容量線程池。建立線程池的時候,容量固定。 5 * 構造的時候,提供線程池最大容量 6 * new xxxxx -> 7 * ExecutorService - 線程池服務類型。全部的線程池類型都實現這個接口。 8 * 實現這個接口,表明能夠提供線程池能力。 9 * shutdown - 優雅關閉。 不是強行關閉線程池,回收線程池中的資源。而是再也不處理新的任務,將已接收的任務處理完畢後 10 * 再關閉。 11 * Executors - Executor的工具類。相似Collection和Collections的關係。 12 * 能夠更簡單的建立若干種線程池。 13 */ 14 package concurrent.t08; 15 16 import java.util.concurrent.ExecutorService; 17 import java.util.concurrent.Executors; 18 import java.util.concurrent.TimeUnit; 19 20 public class Test_02_FixedThreadPool { 21 22 public static void main(String[] args) { 23 ExecutorService service = 24 Executors.newFixedThreadPool(5); 25 for(int i = 0; i < 6; i++){ 26 service.execute(new Runnable() { 27 @Override 28 public void run() { 29 try { 30 TimeUnit.MILLISECONDS.sleep(500); 31 } catch (InterruptedException e) { 32 e.printStackTrace(); 33 } 34 System.out.println(Thread.currentThread().getName() + " - test executor"); 35 } 36 }); 37 } 38 39 System.out.println(service); 40 41 service.shutdown(); 42 // 是否已經結束, 至關於回收了資源。 43 System.out.println(service.isTerminated()); 44 // 是否已經關閉, 是否調用過shutdown方法 45 System.out.println(service.isShutdown()); 46 System.out.println(service); 47 48 try { 49 TimeUnit.SECONDS.sleep(2); 50 } catch (InterruptedException e) { 51 e.printStackTrace(); 52 } 53 54 // service.shutdown(); 55 System.out.println(service.isTerminated()); 56 System.out.println(service.isShutdown()); 57 System.out.println(service); 58 } 59 60 }
3.7 CachedThreadPool
緩存的線程池。容量不限(Integer.MAX_VALUE)。自動擴容。容量管理策略:若是線程池中的線程數量不知足任務執行,建立新的線程。每次有新任務沒法即時處理的時候,都會建立新的線程。當線程池中的線程空閒時長達到必定的臨界值(默認 60 秒),自動釋放線程。默認線程空閒 60 秒,自動銷燬。
應用場景: 內部應用或測試應用。 內部應用,有條件的內部數據瞬間處理時應用,如:
電信平臺夜間執行數據整理(有把握在短期內處理完全部工做,且對硬件和軟件有足夠的信心)。 測試應用,在測試的時候,嘗試獲得硬件或軟件的最高負載量,用於提供FixedThreadPool 容量的指導。
1 /** 2 * 線程池 3 * 無容量限制的線程池(最大容量默認爲Integer.MAX_VALUE) 4 */ 5 package concurrent.t08; 6 7 import java.util.concurrent.ExecutorService; 8 import java.util.concurrent.Executors; 9 import java.util.concurrent.TimeUnit; 10 11 public class Test_05_CachedThreadPool { 12 13 public static void main(String[] args) { 14 ExecutorService service = Executors.newCachedThreadPool(); 15 System.out.println(service); 16 17 for(int i = 0; i < 5; i++){ 18 service.execute(new Runnable() { 19 @Override 20 public void run() { 21 try { 22 TimeUnit.MILLISECONDS.sleep(500); 23 } catch (InterruptedException e) { 24 e.printStackTrace(); 25 } 26 System.out.println(Thread.currentThread().getName() + " - test executor"); 27 } 28 }); 29 } 30 31 System.out.println(service); 32 33 try { 34 TimeUnit.SECONDS.sleep(65); 35 } catch (InterruptedException e) { 36 e.printStackTrace(); 37 } 38 39 System.out.println(service); 40 } 41 42 }
3.8 ScheduledThreadPool
計劃任務線程池。能夠根據計劃自動執行任務的線程池。
scheduleAtFixedRate(Runnable, start_limit, limit, timeunit)
runnable - 要執行的任務。
start_limit - 第一次任務執行的間隔。
limit - 屢次任務執行的間隔。
timeunit - 屢次任務執行間隔的時間單位。
使用場景: 計劃任務時選用(DelaydQueue),如:電信行業中的數據整理,沒分鐘整理,沒消失整理,天天整理等。
1 /** 2 * 線程池 3 * 計劃任務線程池。 4 */ 5 package concurrent.t08; 6 7 import java.util.concurrent.Executors; 8 import java.util.concurrent.ScheduledExecutorService; 9 import java.util.concurrent.TimeUnit; 10 11 public class Test_07_ScheduledThreadPool { 12 13 public static void main(String[] args) { 14 ScheduledExecutorService service = Executors.newScheduledThreadPool(3); 15 System.out.println(service); 16 17 // 定時完成任務。 scheduleAtFixedRate(Runnable, start_limit, limit, timeunit) 18 // runnable - 要執行的任務。 19 service.scheduleAtFixedRate(new Runnable() { 20 @Override 21 public void run() { 22 try { 23 TimeUnit.MILLISECONDS.sleep(500); 24 } catch (InterruptedException e) { 25 e.printStackTrace(); 26 } 27 System.out.println(Thread.currentThread().getName()); 28 } 29 }, 0, 300, TimeUnit.MILLISECONDS); 30 31 } 32 33 }
3.9 SingleThreadExceutor
單一容量的線程池。使用場景: 保證任務順序時使用。如: 遊戲大廳中的公共頻道聊天。秒殺。
1 /** 2 * 線程池 3 * 容量爲1的線程池。 順序執行。 4 */ 5 package concurrent.t08; 6 7 import java.util.concurrent.ExecutorService; 8 import java.util.concurrent.Executors; 9 import java.util.concurrent.TimeUnit; 10 11 public class Test_06_SingleThreadExecutor { 12 13 public static void main(String[] args) { 14 ExecutorService service = Executors.newSingleThreadExecutor(); 15 System.out.println(service); 16 17 for(int i = 0; i < 5; i++){ 18 service.execute(new Runnable() { 19 @Override 20 public void run() { 21 try { 22 TimeUnit.MILLISECONDS.sleep(500); 23 } catch (InterruptedException e) { 24 e.printStackTrace(); 25 } 26 System.out.println(Thread.currentThread().getName() + " - test executor"); 27 } 28 }); 29 } 30 31 } 32 33 }
3.10 ForkJoinPool
分支合併線程池(mapduce 相似的設計思想)。適合用於處理複雜任務。
初始化線程容量與 CPU 核心數相關。
線程池中運行的內容必須是 ForkJoinTask 的子類型(RecursiveTask,RecursiveAction)。ForkJoinPool - 分支合併線程池。 能夠遞歸完成複雜任務。要求可分支合併的任務必須是 ForkJoinTask 類型的子類型。其中提供了分支和合並的能力。ForkJoinTask 類型提供了兩個抽象子類型,RecursiveTask 有返回結果的分支合併任務,RecursiveAction 無返回結果的分支合併任務。(Callable/Runnable)compute 方法:就是任務的執行邏輯。
ForkJoinPool 沒有所謂的容量。默認都是 1 個線程。根據任務自動的分支新的子線程。當子線程任務結束後,自動合併。所謂自動是根據 fork 和 join 兩個方法實現的。
應用: 主要是作科學計算或天文計算的。數據分析的。
1 /** 2 * 線程池 3 * 分支合併線程池。 4 */ 5 package concurrent.t08; 6 7 import java.io.IOException; 8 import java.util.Random; 9 import java.util.concurrent.ExecutionException; 10 import java.util.concurrent.ForkJoinPool; 11 import java.util.concurrent.Future; 12 import java.util.concurrent.RecursiveTask; 13 14 public class Test_08_ForkJoinPool { 15 16 final static int[] numbers = new int[1000000]; 17 final static int MAX_SIZE = 50000; 18 final static Random r = new Random(); 19 20 21 static{ 22 for(int i = 0; i < numbers.length; i++){ 23 numbers[i] = r.nextInt(1000); 24 } 25 } 26 27 static class AddTask extends RecursiveTask<Long>{ // RecursiveAction 28 int begin, end; 29 public AddTask(int begin, int end){ 30 this.begin = begin; 31 this.end = end; 32 } 33 34 // 35 protected Long compute(){ 36 if((end - begin) < MAX_SIZE){ 37 long sum = 0L; 38 for(int i = begin; i < end; i++){ 39 sum += numbers[i]; 40 } 41 // System.out.println("form " + begin + " to " + end + " sum is : " + sum); 42 return sum; 43 }else{ 44 int middle = begin + (end - begin)/2; 45 AddTask task1 = new AddTask(begin, middle); 46 AddTask task2 = new AddTask(middle, end); 47 task1.fork();// 就是用於開啓新的任務的。 就是分支工做的。 就是開啓一個新的線程任務。 48 task2.fork(); 49 // join - 合併。將任務的結果獲取。 這是一個阻塞方法。必定會獲得結果數據。 50 return task1.join() + task2.join(); 51 } 52 } 53 } 54 55 public static void main(String[] args) throws InterruptedException, ExecutionException, IOException { 56 long result = 0L; 57 for(int i = 0; i < numbers.length; i++){ 58 result += numbers[i]; 59 } 60 System.out.println(result); 61 62 ForkJoinPool pool = new ForkJoinPool(); 63 AddTask task = new AddTask(0, numbers.length); 64 65 Future<Long> future = pool.submit(task); 66 System.out.println(future.get()); 67 68 } 69 70 }
3.11 ThreadPoolExecutor
線程池底層實現。除 ForkJoinPool 外,其餘經常使用線程池底層都是使用 ThreadPoolExecutor實現的。
public ThreadPoolExecutor (int corePoolSize, // 核心容量,建立線程池的時候,默認有多少線程。也是線程池保持的最少線程數 int maximumPoolSize, // 最大容量,線程池最多有多少線程 long keepAliveTime, // 生命週期,0 爲永久。當線程空閒多久後,自動回收。 TimeUnit unit, // 生命週期單位,爲生命週期提供單位,如:秒,毫秒 BlockingQueue<Runnable> workQueue // 任務隊列,阻塞隊列。注意,泛型必須是 Runnable ); //使用場景: 默認提供的線程池不知足條件時使用。如:初始線程數據 4,最大線程數200,線程空閒週期 30 秒。
1 /** 2 * 線程池 3 * 固定容量線程池 4 */ 5 package concurrent.t08; 6 7 import java.util.ArrayList; 8 import java.util.concurrent.ExecutorService; 9 import java.util.concurrent.LinkedBlockingQueue; 10 import java.util.concurrent.ThreadPoolExecutor; 11 import java.util.concurrent.TimeUnit; 12 13 public class Test_09_ThreadPoolExecutor { 14 15 public static void main(String[] args) { 16 // 模擬fixedThreadPool, 核心線程5個,最大容量5個,線程的生命週期無限。 17 ExecutorService service = 18 new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, 19 new LinkedBlockingQueue<Runnable>()); 20 21 for(int i = 0; i < 6; i++){ 22 service.execute(new Runnable() { 23 @Override 24 public void run() { 25 try { 26 TimeUnit.MILLISECONDS.sleep(500); 27 } catch (InterruptedException e) { 28 e.printStackTrace(); 29 } 30 System.out.println(Thread.currentThread().getName() + " - test executor"); 31 } 32 }); 33 } 34 35 System.out.println(service); 36 37 service.shutdown(); 38 System.out.println(service.isTerminated()); 39 System.out.println(service.isShutdown()); 40 System.out.println(service); 41 42 try { 43 TimeUnit.SECONDS.sleep(2); 44 } catch (InterruptedException e) { 45 e.printStackTrace(); 46 } 47 48 service.shutdown(); 49 System.out.println(service.isTerminated()); 50 System.out.println(service.isShutdown()); 51 System.out.println(service); 52 53 } 54 55 }