Java中建立線程主要有三種方式:html
其他詳略,自查網絡java
synchronized關鍵字之內存中的一個對象做爲鎖(互斥鎖),獲取到這個對象的線程能夠執行synchronized內部的代碼,執行完畢才放棄鎖web
public class T { private int count = 10; private Object o = new Object(); public void m() { synchronized(o) { //任何線程要執行下面的代碼,必須先拿到o的鎖 count--; System.out.println(Thread.currentThread().getName() + " count = " + count); } } }
專門建立一個無用的對象做爲鎖顯得浪費,能夠直接以當前對象做爲鎖面試
public class T { private int count = 10; public void m() { synchronized(this) { //任何線程要執行下面的代碼,必須先拿到this的鎖 count--; System.out.println(Thread.currentThread().getName() + " count = " + count); } } }
public class T { private int count = 10; public synchronized void m() { //等同於在方法的代碼執行時要synchronized(this) count--; System.out.println(Thread.currentThread().getName() + " count = " + count); } }
下面代碼m方法與mm方法的鎖定效果同樣api
public class T { private static int count = 10; public synchronized static void m() { //這裏等同於synchronized(yxxy.c_004.T.class) count--; System.out.println(Thread.currentThread().getName() + " count = " + count); } public static void mm() { synchronized(T.class) { //考慮一下這裏寫synchronized(this)是否能夠? // 不能夠,靜態方法不能訪問非靜態對象 count --; } } }
下面的代碼能夠演示到多線程運行時出現的重複問題,若取消synchronized關鍵字的註釋,能夠避免這個問題數組
public class T implements Runnable { private int count = 10; public /*synchronized*/ void run() { count--; System.out.println(Thread.currentThread().getName() + " count = " + count); } public static void main(String[] args) { T t = new T(); for(int i=0; i<5; i++) { new Thread(t, "THREAD" + i).start(); } } }
一個synchronized代碼塊是做爲原子性操做的,總體不可分緩存
緣由是隻有synchronized方法執行才須要申請鎖,其餘方法不須要申請鎖,二者互不干擾(比喻:A上廁所鎖了門與來洗手盆洗手的B沒有關係)安全
public class T { public synchronized void m1() { System.out.println(Thread.currentThread().getName() + " m1 start..."); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " m1 end"); } public void m2() { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " m2 "); } public static void main(String[] args) { T t = new T(); /*new Thread(()->t.m1(), "t1").start(); new Thread(()->t.m2(), "t2").start();*/ new Thread(t::m1, "t1").start(); new Thread(t::m2, "t2").start(); /* new Thread(new Runnable() { @Override public void run() { t.m1(); } }); */ } }
面試有關小知識點:銀行代碼固然須要加鎖,寫操做須要加鎖那麼讀操做須要加鎖嗎?網絡
一個線程已經擁有某個對象的鎖,再次申請相同的鎖的時候仍然會獲得該對象的鎖session
即已取得鎖的當前線程再申請獲取同一個鎖是可行的
也就是說synchronized得到的鎖是可重入的
下面代碼m1內調用m2,是能夠執行的
import java.util.concurrent.TimeUnit; public class T { synchronized void m1() { System.out.println("m1 start"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } m2(); } synchronized void m2() { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("m2"); } }
因此,在併發處理的過程當中,有異常要多加當心,可能會發生不一致的狀況。
好比,在一個web app處理過程當中,多個servlet線程共同訪問同一個資源,這時若是異常處理不合適,在第一個線程中拋出異常,其餘線程就會進入同步代碼區,有可能會訪問到異常產生時第一個線程未修改完的數據
所以要很是當心的處理同步業務邏輯中的異常
當可能拋出異常能夠catch來避免上述錯誤(如回滾數據操做)
import java.util.concurrent.TimeUnit; public class T { int count = 0; synchronized void m() { System.out.println(Thread.currentThread().getName() + " start"); while(true) { count ++; System.out.println(Thread.currentThread().getName() + " count = " + count); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } if(count == 5) { int i = 1/0; //此處拋出異常,鎖將被釋放,要想不被釋放,能夠在這裏進行catch,而後讓循環繼續 System.out.println(i); } } } public static void main(String[] args) { T t = new T(); Runnable r = new Runnable() { @Override public void run() { t.m(); } }; new Thread(r, "t1").start(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(r, "t2").start(); } }
最基本原理:
線程1須要按順序獲取鎖A和鎖B來執行,線程2須要按順序獲取鎖B和鎖A來執行,當線程1獲取了鎖A未獲取鎖B時線程2併發,線程2執行獲取了鎖B,此時就發生死鎖,線程1沒法獲取鎖B,線程2也沒法獲取鎖A來繼續執行,程序卡死
下面程序能夠模擬死鎖
public class T { Object a = new Object(); Object b = new Object(); public void m1() { synchronized (a) { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (b) { System.out.println("success1"); } } } public void m2() { synchronized (b) { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (a) { System.out.println("success2"); } } } public static void main(String[] args) { T t = new T(); new Thread(t::m1, "t1").start(); new Thread(t::m2, "t2").start(); } }
synchronized同步代碼塊中的代碼越少越好
import java.util.concurrent.TimeUnit; public class T { int count = 0; synchronized void m1() { //do sth need not sync try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } //業務邏輯中只有下面這句須要sync,這時不該該給整個方法上鎖 count ++; //do sth need not sync try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } void m2() { //do sth need not sync try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } //業務邏輯中只有下面這句須要sync,這時不該該給整個方法上鎖 //採用細粒度的鎖,可使線程爭用時間變短,從而提升效率 synchronized(this) { count ++; } //do sth need not sync try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } }
鎖定某對象o,若是o的屬性發生改變,不影響鎖的使用,可是若是o變成另一個對象,則鎖定的對象發生改變
應該避免將鎖定對象的引用變成另外的對象
當鎖定對象改變後,多個線程間獲取的鎖對象就有可能不同了,使得同步代碼塊失效
下面代碼說明了這個問題,鎖定對象改變了,線程t2進入同步代碼塊執行,但若鎖定對象不變,線程t2將不能進入同步代碼塊執行
import java.util.concurrent.TimeUnit; public class T { Object o = new Object(); void m() { synchronized(o) { while(true) { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); } } } public static void main(String[] args) { T t = new T(); //啓動第一個線程 new Thread(t::m, "t1").start(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } //建立第二個線程 Thread t2 = new Thread(t::m, "t2"); t.o = new Object(); //鎖對象發生改變,因此t2線程得以執行,若是註釋掉這句話,線程2將永遠得不到執行機會 t2.start(); } }
不要以字符串常量做爲鎖定對象,因爲String類型的特殊性(常量池),表面上變量名不一樣的兩個String對象可能指向的是同一個地址
在下面的程序中,m1和m2其實鎖定的是同一個對象
這種狀況還會發生比較詭異的現象,好比你用到了一個類庫,在該類庫中代碼鎖定了字符串「Hello」,可是你讀不到源碼,因此你在本身的代碼中也鎖定了"Hello",這時候就有可能發生很是詭異的死鎖阻塞,由於你的程序和你用到的類庫不經意間使用了同一把鎖
public class T { String s1 = "Hello"; String s2 = "Hello"; void m1() { synchronized(s1) { } } void m2() { synchronized(s2) { } } }
volatile關鍵字,能夠使一個變量在多個線程間可見
如A、B線程都用到同一個變量,Java默認是A線程中保留一份copy,這樣若是B線程修改了該變量,則A線程未必知道
在下面的代碼中,running是存在於堆內存的t對象中,當線程t1開始運行的時候,會把running值從內存中讀到(拷貝到)t1線程的工做區,在運行過程當中直接使用這個copy,並不會每次都去讀取堆內存,這樣,當主線程修改running的值以後,t1線程感知不到,因此不會中止運行
使用volatile關鍵字,將會強制全部線程都去堆內存中讀取變量running的值
能夠閱讀這篇文章進行更深刻的理解
http://www.cnblogs.com/nexiyi/p/java_memory_model_and_thread.html
volatile並不能保證多個線程共同修改running變量時所帶來的不一致問題,也就是說volatile不能替代synchronized
下面代碼能夠說明volatile關鍵字的效果,當running變量能夠被其餘線程改變時,while代碼塊纔會結束並打印"m end"
import java.util.concurrent.TimeUnit; public class T { /*volatile*/ boolean running = true; //對比一下有無volatile的狀況下,整個程序運行結果的區別 void m() { System.out.println("m start"); while(running) { /* try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }*/ // 若上面幾行代碼取消註釋,有可能在cpu空閒時去讀取堆內存中的running變量的值從而結束while代碼塊 } System.out.println("m end!"); } public static void main(String[] args) { T t = new T(); new Thread(t::m, "t1").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } t.running = false; } }
volatile並不能保證多個線程共同修改running變量時所帶來的不一致問題,也就是說volatile不能替代synchronized
運行下面的程序輸出count的值會出現不到100000的結果,緣由是由於雖然volatile能夠保證每一個線程讀取count的值是同步的,但不能保證/要求線程寫入count的值時必定是根據此時的count值+1的操做
即讀操做和寫操做是複合操做,不構成原子性操做
解決這個問題可使用synchronized關鍵字修飾m方法便可以保證可見性和原子性同步
import java.util.ArrayList; import java.util.List; public class T { volatile int count = 0; void m() { for (int i = 0; i < 10000; i++) count++; } public static void main(String[] args) { T t = new T(); List<Thread> threads = new ArrayList<Thread>(); for (int i = 0; i < 10; i++) { threads.add(new Thread(t::m, "thread-" + i)); } threads.forEach((o) -> o.start()); threads.forEach((o) -> { try { o.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println(t.count); } }
AtomicXXX類自己方法都是原子性的,但不能保證多個方法連續調用是原子性的,即多個AtomicXXX類的方法連續被調用的時候不能保證原子性
import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; public class T { /*volatile*/ //int count = 0; AtomicInteger count = new AtomicInteger(0); /*synchronized*/ void m() { for (int i = 0; i < 10000; i++) //if count.get() < 1000 count.incrementAndGet(); //count++ } public static void main(String[] args) { T t = new T(); List<Thread> threads = new ArrayList<Thread>(); for (int i = 0; i < 10; i++) { threads.add(new Thread(t::m, "thread-" + i)); } threads.forEach((o) -> o.start()); threads.forEach((o) -> { try { o.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println(t.count); } }
線程安全有兩個方面:執行控制和內存可見
synchronized關鍵字解決的是執行控制的問題,它會阻止其它線程獲取當前對象的監控鎖,這樣就使得當前對象中被synchronized關鍵字保護的代碼塊沒法被其它線程訪問,也就沒法併發執行。更重要的是,synchronized還會建立一個內存屏障,內存屏障指令保證了全部CPU操做結果都會直接刷到主存中,從而保證了操做的內存可見性,同時也使得先得到這個鎖的線程的全部操做,都happens-before於隨後得到這個鎖的線程的操做。
volatile關鍵字解決的是內存可見性的問題,會使得全部對volatile變量的讀寫都會直接刷到主存,即保證了變量的可見性。這樣就能知足一些對變量可見性有要求而對讀取順序沒有要求的需求。
使用volatile關鍵字僅能實現對原始變量(如boolen、 short 、int 、long等)操做的原子性,但須要特別注意, volatile不能保證複合操做的原子性,即便只是i++,實際上也是由多個原子操做組成:read i; inc; write i,假如多個線程同時執行i++,volatile只能保證他們操做的i是同一塊內存,但依然可能出現寫入髒數據的狀況。
volatile的性能比synchronized的性能要高
能夠用volatile的時候儘可能避免使用synchronized
ReentrantLock能夠用於替代synchronized,前者能夠完成後者可完成的功能且更靈活(但性能沒有明顯區別)
但使用ReentrantLock必須手動釋放鎖,使用synchronized鎖定的話若是遇到異常,JVM會自動釋放鎖,可是ReentrantLock必須手動釋放鎖,所以常常在finally塊中進行鎖的釋放
ReentrantLock是Java併發包中互斥鎖,它有公平鎖和非公平鎖兩種實現方式
詳細可見https://www.jianshu.com/p/155260c8af6c
下面的代碼能夠演示使用重入鎖完成使m1方法與m2方法互斥的功能,
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ReentrantLock2 { Lock lock = new ReentrantLock(); // 建立鎖 void m1() { try { lock.lock(); //synchronized(this) 申請並鎖定鎖lock for (int i = 0; i < 10; i++) { TimeUnit.SECONDS.sleep(1); System.out.println(i); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); // 解鎖 } } void m2() { lock.lock(); // 申請並鎖定鎖lock System.out.println("m2 ..."); lock.unlock(); // 解鎖 } public static void main(String[] args) { ReentrantLock2 rl = new ReentrantLock2(); new Thread(rl::m1).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(rl::m2).start(); } }
使用ReentrantLock能夠調用tryLock方法嘗試進行鎖定
線程能夠根據tryLock方法的返回值判斷是否鎖定並以此決定是否繼續等待或執行其餘操做
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ReentrantLock3 { Lock lock = new ReentrantLock(); void m1() { try { lock.lock(); for (int i = 0; i < 10; i++) { TimeUnit.SECONDS.sleep(1); System.out.println(i); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } /** * 使用tryLock進行嘗試鎖定,無論鎖定與否,方法都將繼續執行 * 能夠根據tryLock的返回值來斷定是否鎖定 * 也能夠指定tryLock的時間,因爲tryLock(time)拋出異常,因此要注意unclock的處理,必須放到finally中 */ void m2() { /* boolean locked = lock.tryLock(); System.out.println("m2 ..." + locked); if(locked) lock.unlock(); */ boolean locked = false; try { locked = lock.tryLock(5, TimeUnit.SECONDS); System.out.println("m2 ..." + locked); } catch (InterruptedException e) { e.printStackTrace(); } finally { if(locked) lock.unlock(); } } public static void main(String[] args) { ReentrantLock3 rl = new ReentrantLock3(); new Thread(rl::m1).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(rl::m2).start(); } }
調用lockInterruptibly方法申請鎖的線程能夠對線程interrupt方法作出響應
即若主線程想調用interrupt方法打斷某一個線程的執行,一般來說經過lock方法申請鎖卻由於未申請到鎖而阻塞的線程不能被打斷,而經過lockInterruptibly方法申請鎖的線程阻塞時能夠對主線程調用interrupt方法做出響應而被打斷
簡言之:lockInterruptibly方法的做用是使在一個線程在等待鎖的過程當中,能夠被打斷
下面的代碼能夠演示當線程t1霸佔鎖使得線程t2一直等待得到鎖而阻塞,t2使用lockInterruptibly方法代替lock方法來聲明申請鎖,主線程能夠經過調用線程t2對象的interrupt方法打斷線程t2的執行——「別等了,哥們」
線程t1在獲取/申請鎖的過程中不響應中斷(lock方法),而t2在獲取/申請鎖的過程響應中斷(lockInterruptibly方法)
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; public class ReentrantLock4 { public static void main(String[] args) { Lock lock = new ReentrantLock(); Thread t1 = new Thread(()->{ try { lock.lock(); System.out.println("t1 start"); TimeUnit.SECONDS.sleep(Integer.MAX_VALUE); System.out.println("t1 end"); } catch (InterruptedException e) { System.out.println("interrupted!"); } finally { lock.unlock(); } }); t1.start(); Thread t2 = new Thread(()->{ try { //lock.lock(); lock.lockInterruptibly(); //能夠對interrupt()方法作出響應 System.out.println("t2 start"); TimeUnit.SECONDS.sleep(5); System.out.println("t2 end"); } catch (InterruptedException e) { System.out.println("interrupted!"); } finally { lock.unlock(); } }); t2.start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } t2.interrupt(); //打斷線程2的等待 } }
ReentrantLock在建立時能夠被指定爲公平鎖,而synchronized是非公平鎖
公平鎖:某個對象的鎖對全部線程都是公平的,先到先得。每次加鎖前都會檢查隊列裏面有沒有排隊等待的線程,有則排隊等待,沒有才會嘗試獲取鎖。
非公平鎖:當一個線程採用非公平鎖這種方式獲取鎖時,該線程會首先去嘗試獲取鎖而不是等待。若是沒有後去成功,那麼它纔會去隊列裏面等待。
下面代碼使用公平鎖,結果應該是兩個線程交替打印,若建立重入鎖時沒有傳入true,則打印結果沒法預測
import java.util.concurrent.locks.ReentrantLock; public class ReentrantLock5 extends Thread { private static ReentrantLock lock=new ReentrantLock(true); //參數爲true表示爲公平鎖,請對比輸出結果 public void run() { for(int i=0; i<100; i++) { lock.lock(); try{ System.out.println(Thread.currentThread().getName()+"得到鎖"); }finally{ lock.unlock(); } } } public static void main(String[] args) { ReentrantLock5 rl=new ReentrantLock5(); Thread th1=new Thread(rl); Thread th2=new Thread(rl); th1.start(); th2.start(); } }
ThreadLocal是使用空間換時間,synchronized是使用時間換空間
簡言之:ThreadLocal中存放的數據每一個線程獨立一份,各個線程之間的ThreadLocal互不影響
當單個線程能夠獨立維護一個變量,不須要或不該該被其餘線程修改這個變量,則可使用TreadLocal,好比在Hibernate中session就存在與ThreadLocal中,避免synchronized的使用
注意:ThreadLocal可能會致使內存泄漏
下面代碼結果應該是線程t1經過get方法不能獲取Person對象(獲取了null)
import java.util.concurrent.TimeUnit; public class ThreadLocal2 { //volatile static Person p = new Person(); static ThreadLocal<Person> tl = new ThreadLocal<>(); public static void main(String[] args) { new Thread(()->{ try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(tl.get()); }).start(); new Thread(()->{ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } tl.set(new Person()); }).start(); } static class Person { String name = "zhangsan"; } }
詳見https://blog.csdn.net/cselmu9/article/details/51366946
兩個特色
在任何狀況下,單例類永遠只有一個實例存在
單例須要有能力爲整個系統提供這一惟一實例
下面的程序使用了靜態內置類的方式來實現單例模式
能夠實現懶加載並且線程安全
import java.util.Arrays; public class Singleton { private Singleton() { System.out.println("single"); } private static class Inner { private static Singleton s = new Singleton(); } public static Singleton getSingle() { return Inner.s; } public static void main(String[] args) { Thread[] ths = new Thread[200]; for(int i=0; i<ths.length; i++) { ths[i] = new Thread(()->{ System.out.println(Singleton.getSingle()); }); } Arrays.asList(ths).forEach(o->o.start()); } }
使用早期的同步容器以及Collections.synchronized**方法的不足之處,本文省略不表,請閱讀:
http://blog.csdn.net/itm_hadf/article/details/7506529*
使用新的併發容器
http://xuganggogo.iteye.com/blog/321630
有N張火車票,每張票都有一個編號,同時有10個窗口對外售票,寫一個模擬程序
實現一
如下是最基本的實現,弊端爲ArrayList的remove方法不是同步的,可能重複remove同一張票;判斷剩餘票數的代碼也不是同步的,可能賣出超過10000張票
/** * 有N張火車票,每張票都有一個編號 * 同時有10個窗口對外售票 * 請寫一個模擬程序 * * 分析下面的程序可能會產生哪些問題? * 重複銷售?超量銷售? * * @author 馬士兵 */ import java.util.ArrayList; import java.util.List; public class TicketSeller1 { static List<String> tickets = new ArrayList<>(); static { for(int i=0; i<10000; i++) tickets.add("票編號:" + i); } public static void main(String[] args) { for(int i=0; i<10; i++) { new Thread(()->{ while(tickets.size() > 0) { System.out.println("銷售了--" + tickets.remove(0)); } }).start(); } } }
使用Vector或者Collections.synchronizedXXX這種同步容器來實現同步
仍然存在弊端,由於判斷條件的代碼和操做容器的代碼分離了,雖然Vector的remove方法是同步的,但判斷是否還有票的代碼不是同步的,可能售出超過10000張票(若打開while塊內線程sleep的語句,能夠模擬這種出錯,緣由是多個線程進入到了while塊同時執行remove操做了)
即從判斷到操做容器這兩步中間可能會出現併發問題,由於兩步操做不一樣步
解決方法呼之欲出了,使判斷(size>0)和操做容器(remove方法)的代碼同步,即實現原子性
import java.util.Vector; import java.util.concurrent.TimeUnit; public class TicketSeller2 { static Vector<String> tickets = new Vector<>(); static { for(int i=0; i<1000; i++) { tickets.add("票 編號:" + i); } } public static void main(String[] args) { for(int i=0; i<10; i++) { new Thread(()->{ while(tickets.size() > 0) { /*try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }*/ System.out.println("銷售了--" + tickets.remove(0)); } }).start(); } } }
經過在判斷和操做這兩步外部加synchronized塊實現原子性
但這種實現的效率並不特別高
import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; public class TicketSeller3 { static List<String> tickets = new LinkedList<>(); static { for(int i=0; i<1000; i++) { tickets.add("票 編號:" + i); } } public static void main(String[] args) { for(int i=0; i<10; i++) { new Thread(()->{ while(true) { synchronized(tickets) { if(tickets.size() <= 0) { break; } try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("銷售了--" + tickets.remove(0)); } } }).start(); } } }
使用併發容器ConcurrentLinkedQueue(併發鏈表隊列)提升併發性
其中poll方法是同步的,操做代碼並無加鎖,但能夠實現高效率線程安全操做
原理是隊列的特性:隊列內通常不容許有值爲null的元素(即便容器容許null值也應該做判斷避免傳入null值),若沒有元素則返回null,先取出後判斷,並非先判斷後取出,因此完美解決了判斷和操做兩部分的同步問題
import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; public class TicketSeller4 { static Queue<String> tickets = new ConcurrentLinkedQueue<>(); static { for(int i=0; i<1000; i++) { tickets.add("票 編號:" + i); } } public static void main(String[] args) { for(int i=0; i<10; i++) { new Thread(()->{ while(true) { String s = tickets.poll(); if(s == null) { break; } else { System.out.println("銷售了--" + s); } } }).start(); } } }
可見併發容器的優點所在,下文將介紹各個經常使用的併發容器類
如下程序能夠演示高併發下不一樣併發Map容器的添加效率
小關鍵:這裏的CountDownLatch主要是爲了限制主線程等待添加操做完成後再繼續執行
import java.util.Arrays; import java.util.Hashtable; import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; public class T01_ConcurrentMap { public static void main(String[] args) { //Map<String, String> map = new ConcurrentHashMap<>(); Map<String, String> map = new ConcurrentSkipListMap<>(); //高併發而且排序 //Map<String, String> map = new Hashtable<>(); //Map<String, String> map = new HashMap<>(); //Collections.synchronizedXXX //TreeMap Random r = new Random(); Thread[] ths = new Thread[100]; CountDownLatch latch = new CountDownLatch(ths.length); long start = System.currentTimeMillis(); for(int i=0; i<ths.length; i++) { ths[i] = new Thread(()->{ for(int j=0; j<10000; j++) { map.put("a" + r.nextInt(100000), "a" + r.nextInt(100000)); } latch.countDown(); }); } Arrays.asList(ths).forEach(t->t.start()); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); System.out.println(end - start); } }
上文代碼執行結果能夠發現執行結果中在併發的狀況下使用ConcurrentHashMap的效率比HashTable高,緣由是HashTable在添加操做時會鎖定整個容器,只響應一個線程的添加操做;而ConcurrentHashMap則是將容器分段(默認16段)(存疑,1.8以後底層改變,CAS替代分段鎖,具體自查),併發操做時只鎖定其中一段
在高併發且須要對元素排序的狀況下,可使用ConcurrentSkipListMap提升效率
關於SkipList(跳躍列表)可參閱http://blog.csdn.net/sunxianghuang/article/details/52221913
跳錶能夠理解爲「多鏈鏈表」,是一種用空間換時間的數據結構,經過在每一個節點中增長了向前的指針,從而提高查找的效率
寫時複製容器(copy on write),當添加/刪除等修改元素操做發生時,將逐一複製原列表值到新容器,修改操做(即寫的操做)完成後再將原容器的引用調整至新容器,從而實現讀取數據的線程安全
主要是讀寫分離的思想:在寫的過程當中引用並未指向新容器,因此讀操做仍然在舊容器中讀取,待寫操做完成後才更新新容器的引用
CopyOnWriteArrayList的實現原理是,在一個線程開始遍歷(建立Iterator對象)時,內部會建立一個「快照」數組,遍歷基於這個快照Iterator進行,在遍歷過程當中這個快照數組不會改變,也就不會拋出
ConcurrentModificationException
。若是在遍歷的過程當中有其餘線程嘗試改變數組的內容,就會拷貝一份新的數據進行變動,然後面再來訪問這個數組的線程,看到的就是變動過的數組。
其實CopyOnWirteArrayList主要就是解決了併發環境下修改操做和對容器遍歷操做的衝突(修改時另外一線程開始遍歷容器會拋出ConcurrentModificationException)
能夠查閱https://juejin.im/post/5aaa2ba8f265da239530b69e
有關ConcurrentModificationException的拓展資料查閱https://juejin.im/post/5a992a0d6fb9a028e46e17ef
多線程環境下,寫時效率低,讀時效率高,適合寫少讀多的環境
下面的程序能夠演示CopyOnWirteArrayList的讀/寫的效率
import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.Vector; import java.util.concurrent.CopyOnWriteArrayList; public class T02_CopyOnWriteList { public static void main(String[] args) { List<String> lists = //new ArrayList<>(); //這個會出併發問題! //new Vector(); new CopyOnWriteArrayList<>(); Random r = new Random(); Thread[] ths = new Thread[100]; for(int i=0; i<ths.length; i++) { Runnable task = new Runnable() { @Override public void run() { for(int i=0; i<1000; i++) lists.add("a" + r.nextInt(10000)); } }; ths[i] = new Thread(task); } runAndComputeTime(ths); System.out.println(lists.size()); } static void runAndComputeTime(Thread[] ths) { long s1 = System.currentTimeMillis(); Arrays.asList(ths).forEach(t->t.start()); Arrays.asList(ths).forEach(t->{ try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long s2 = System.currentTimeMillis(); System.out.println(s2 - s1); } }
在併發環境下用得較多的容器
是無界隊列,容量取決於內存
下面的代碼演示基本使用ConcurrentLinkedQueue,poll和peek方法的區別是poll方法將返回並移除元素,peek方法是獲取元素但不移除
另外有ConcurrentLinkedDeque併發雙向鏈表隊列
import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; public class T04_ConcurrentQueue { public static void main(String[] args) { Queue<String> strs = new ConcurrentLinkedQueue<>(); for(int i=0; i<10; i++) { strs.offer("a" + i); //add } System.out.println(strs); System.out.println(strs.size()); System.out.println(strs.poll()); System.out.println(strs.size()); System.out.println(strs.peek()); System.out.println(strs.size()); //雙端隊列Deque } }
BlockingQueue(阻塞式隊列)實際上是Java對生產者/消費者模式的實現
其中LinkedBlockingQueue是使用鏈表實現的阻塞式無界隊列,put方法在容器已滿時將等待,而take方法在容器爲空時將等待(下文例題中有實現這種生產者/消費者模式的容器的程序)
import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; public class T05_LinkedBlockingQueue { static BlockingQueue<String> strs = new LinkedBlockingQueue<>(); static Random r = new Random(); public static void main(String[] args) { new Thread(() -> { for (int i = 0; i < 100; i++) { try { strs.put("a" + i); //若是滿了,就會等待 TimeUnit.MILLISECONDS.sleep(r.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } } }, "p1").start(); for (int i = 0; i < 5; i++) { new Thread(() -> { for (;;) { try { System.out.println(Thread.currentThread().getName() + " take -" + strs.take()); //若是空了,就會等待 } catch (InterruptedException e) { e.printStackTrace(); } } }, "c" + i).start(); } } }
底層使用數組實現,容量有限制,offer方法能夠向容器添加元素,並返回是否添加成功的布爾值(若容器已滿則不添加元素並返回false,而使用add方法則會拋出異常)
且offer方法能夠傳入參數設置時間間隔,在此段時間間隔內不斷添加,超時則放棄添加操做並返回false
而put方法在容器已滿時將阻塞
import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; public class T06_ArrayBlockingQueue { static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10); static Random r = new Random(); public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 10; i++) { strs.put("a" + i); } strs.put("aaa"); //滿了就會等待,程序阻塞 //strs.add("aaa"); //strs.offer("aaa"); //strs.offer("aaa", 1, TimeUnit.SECONDS); System.out.println(strs); } }
TransferQueue有transfer方法(將元素放入容器),這個方法的做用是當多個消費者線程等待獲取隊列中的元素時,此時生產者再生產一個元素,不放入隊列中,而是能夠直接交給消費者線程,但使用了transfer方法,若沒有消費者線程等待獲取元素,使用transfer方法的線程將阻塞直至消費者線程出現
能夠提升併發效率
import java.util.concurrent.LinkedTransferQueue; public class T08_TransferQueue { public static void main(String[] args) throws InterruptedException { LinkedTransferQueue<String> strs = new LinkedTransferQueue<>(); /*new Thread(() -> { try { System.out.println(strs.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start();*/ //strs.transfer("aaa"); strs.put("aaa"); new Thread(() -> { try { System.out.println(strs.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
是一種特殊的TransferQueue,特殊在容量爲0
不能調用add方法,只能調用put方法(將阻塞等待消費者線程)
若沒有消費者線程等待獲取容器中的值,則會拋出異常IllegalStateException:Queue full
import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; public class T09_SynchronusQueue { //容量爲0 public static void main(String[] args) throws InterruptedException { BlockingQueue<String> strs = new SynchronousQueue<>(); new Thread(()->{ try { System.out.println(strs.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); strs.put("aaa"); //阻塞等待消費者消費 //strs.add("aaa"); System.out.println(strs.size()); } }
一樣,DelayQueue也是一個線程安全的無界隊列
特色是當隊列中的元素到達延遲時間時才能被取出,隊列元素會按照最終執行時間(阻塞結束後到被執行的時間)在隊列中進行排序,頭部爲最終執行時間最長的元素
可使用延遲隊列來執行定時任務
DelayQueue是一個無界阻塞隊列,該隊列的頭部 是延遲期滿後保存時間最長的 Delayed 元素。若是延遲都尚未期滿,則隊列沒有頭部,而且 poll 將返回 null。當一個元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一個小於等於 0 的值時,將發生到期。即便沒法使用 take 或 poll 移除未到期的元素,也不會將這些元素做爲正常元素對待。例如,size 方法同時返回到期和未到期元素的計數。此隊列不容許使用 null 元素。
下面的代碼演示了使用DelayQueue,其中DelayQueue存放的元素須要實現Delayed接口以實現元素延遲計時等功能(以下MyTask類實現了Delay接口)
import java.util.Calendar; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class T07_DelayQueue { static BlockingQueue<MyTask> tasks = new DelayQueue<>(); static Random r = new Random(); static class MyTask implements Delayed { long runningTime; MyTask(long rt) { this.runningTime = rt; } @Override public int compareTo(Delayed o) { if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) return -1; else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) return 1; else return 0; } @Override public long getDelay(TimeUnit unit) { return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public String toString() { return "" + runningTime; } } public static void main(String[] args) throws InterruptedException { long now = System.currentTimeMillis(); MyTask t1 = new MyTask(now + 1000); MyTask t2 = new MyTask(now + 2000); MyTask t3 = new MyTask(now + 1500); MyTask t4 = new MyTask(now + 2500); MyTask t5 = new MyTask(now + 500); tasks.put(t1); tasks.put(t2); tasks.put(t3); tasks.put(t4); tasks.put(t5); System.out.println(tasks); for(int i=0; i<5; i++) { System.out.println(tasks.take()); } } }
最頂層定義
Executor是一個接口,內部定義了execute(Runnable command)方法,實現類須要實現這個方法編寫須要實現的具體任務
簡言之,Executor的實現類是用於執行某個任務的
import java.util.concurrent.Executor; public class T01_MyExecutor implements Executor{ public static void main(String[] args) { new T01_MyExecutor().execute(()->System.out.println("hello executor")); } @Override public void execute(Runnable command) { //new Thread(command).run(); command.run(); } }
ExecutorService是一個繼承Executor的接口,除繼承execute方法外,還定義了一系列其餘關於執行任務的方法(如submit方法)
其中submit方法能夠傳入Callable
與Runnable接口很是類似,Runnable接口內定義了run方法,而Callable接口內部定義了call方法
與Runnable接口的區別在於,Runnable接口的run方法沒有返回值且不能拋出異常,而Callable接口定義的call方法能夠有返回值且能夠拋出異常
Executors是簡化使用Executor接口常見實現類的工具類
其中定義了一些使用的方法好比建立線程池等
具體API自查
若是併發的線程數量不少,而且每一個線程都是執行一個時間很短的任務就結束了,這樣頻繁建立線程就會大大下降系統的效率,由於頻繁建立線程和銷燬線程須要時間
使用線程池能夠達到線程的重用,提升性能
下面的程序演示了建立一個固定線程數量的線程池並直接向線程池派發任務並執行(把任務扔進線程池中,線程池中的數個線程將搶着執行任務)
其中shutdown方法的做用是關閉線程池,若線程仍在執行任務則等待線程所有空閒再關閉,有shutdownNow方法能夠強制關閉線程池
isTerminated方法做用是檢測此時線程池內任務是否被執行完畢(所有線程空閒)
isShutdown方法注意是檢測該線程池是否執行了shutdown方法
在Java中,若是須要設定代碼執行的最長時間,即超時,能夠用Java線程池ExecutorService類配合Future接口來實現。 Future接口是Java標準API的一部分,在java.util.concurrent包中。Future接口是Java線程Future模式的實 現,能夠來進行異步計算。
Future模式能夠這樣來描述:我有一個任務,提交給了Future,Future替我完成這個任務。期間我本身能夠去作任何想作的事情。一段時 間以後,我就即可以從Future那兒取出結果。就至關於下了一張定貨單,一段時間後能夠拿着提訂單來提貨,這期間能夠幹別的任何事情。其中Future 接口就是定貨單,真正處理訂單的是Executor類,它根據Future接口的要求來生產產品。
Future接口提供方法來檢測任務是否被執行完,等待任務執行完得到結果,也能夠設置任務執行的超時時間
這個設置超時的方法就是實現Java程序執行超時的關鍵
在Future接口中聲明瞭5個方法
FutureTask實現了Future接口
不直接構造Future對象,也可使用ExecutorService.submit方法來得到Future對象,submit方法即支持以 Callable接口類型,也支持Runnable接口做爲參數,具備很大的靈活性
下面的程序演示了兩種獲取Future對象的方式並經過講task對象傳入線程構造函數開啓線程使用,其中FutureTask的泛型表示獲取值的類型
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; public class T06_Future { public static void main(String[] args) throws InterruptedException, ExecutionException { FutureTask<Integer> task = new FutureTask<>(()->{ TimeUnit.MILLISECONDS.sleep(500); return 1000; }); //new Callable () { Integer call();} new Thread(task).start(); System.out.println(task.get()); //阻塞 //******************************* ExecutorService service = Executors.newFixedThreadPool(5); Future<Integer> f = service.submit(()->{ TimeUnit.MILLISECONDS.sleep(500); return 1; }); System.out.println(f.get()); System.out.println(f.isDone()); } }
下面程序使用FutureTask配合固定線程數的線程池實現了並行計算1-200000範圍內得素數並對比了串行計算和並行計算的效率
其中不均分計算範圍是由於數值越大計算量越大
import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class T07_ParallelComputing { public static void main(String[] args) throws InterruptedException, ExecutionException { long start = System.currentTimeMillis(); getPrime(1, 200000); long end = System.currentTimeMillis(); System.out.println(end - start); final int cpuCoreNum = 4; ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum); MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20 MyTask t2 = new MyTask(80001, 130000); MyTask t3 = new MyTask(130001, 170000); MyTask t4 = new MyTask(170001, 200000); Future<List<Integer>> f1 = service.submit(t1); Future<List<Integer>> f2 = service.submit(t2); Future<List<Integer>> f3 = service.submit(t3); Future<List<Integer>> f4 = service.submit(t4); start = System.currentTimeMillis(); f1.get(); f2.get(); f3.get(); f4.get(); end = System.currentTimeMillis(); System.out.println(end - start); } static class MyTask implements Callable<List<Integer>> { int startPos, endPos; MyTask(int s, int e) { this.startPos = s; this.endPos = e; } @Override public List<Integer> call() throws Exception { List<Integer> r = getPrime(startPos, endPos); return r; } } static boolean isPrime(int num) { for(int i=2; i<=num/2; i++) { if(num % i == 0) return false; } return true; } static List<Integer> getPrime(int start, int end) { List<Integer> results = new ArrayList<>(); for(int i=start; i<=end; i++) { if(isPrime(i)) results.add(i); } return results; } }
最基本的線程池,線程數固定
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class T05_ThreadPool { public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newFixedThreadPool(5); //execute submit for (int i = 0; i < 6; i++) { service.execute(() -> { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }); } System.out.println(service); service.shutdown(); System.out.println(service.isTerminated()); System.out.println(service.isShutdown()); System.out.println(service); TimeUnit.SECONDS.sleep(5); System.out.println(service.isTerminated()); System.out.println(service.isShutdown()); System.out.println(service); } }
當任務須要時就自動建立新線程(不限制線程數量),線程默認超過60s空閒則銷燬
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class T08_CachedPool { public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newCachedThreadPool(); System.out.println(service); for (int i = 0; i < 2; i++) { service.execute(() -> { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }); } System.out.println(service); TimeUnit.SECONDS.sleep(80); System.out.println(service); } }
這種線程池的做用是保證多個任務順序執行
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class T09_SingleThreadPool { public static void main(String[] args) { ExecutorService service = Executors.newSingleThreadExecutor(); for(int i=0; i<5; i++) { final int j = i; service.execute(()->{ System.out.println(j + " " + Thread.currentThread().getName()); }); } } }
下面程序演示了使用固定頻率執行任務
具體API自查
import java.util.Random; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class T10_ScheduledPool { public static void main(String[] args) { ScheduledExecutorService service = Executors.newScheduledThreadPool(4); service.scheduleAtFixedRate(()->{ try { TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }, 0, 500, TimeUnit.MILLISECONDS); } }
通常來說每一個線程維護一個任務隊列,每一個線程只執行分配給自身的任務,而使用工做竊取線程池當有空閒線程時空閒線程將主動竊取另外線程的任務來執行
WorkStealingPool底層是由ForkJoinPool線程池實現的
注意產生的是守護線程
import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class T11_WorkStealingPool { public static void main(String[] args) throws IOException { ExecutorService service = Executors.newWorkStealingPool(); System.out.println(Runtime.getRuntime().availableProcessors()); service.execute(new R(1000)); service.execute(new R(2000)); service.execute(new R(2000)); service.execute(new R(2000)); //daemon service.execute(new R(2000)); //因爲產生的是精靈線程(守護線程、後臺線程),主線程不阻塞的話,看不到輸出 System.in.read(); } static class R implements Runnable { int time; R(int t) { this.time = t; } @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(time + " " + Thread.currentThread().getName()); } } }
任務的切分(切分子任務到多小)和合並能夠由開發者指定,而ForkJoinPool將根據切分和合並的規則來建立線程並由ForkJoinPool維護線程
能夠參閱:https://www.jianshu.com/p/8d7e3cc892cf
下面程序演示了計算長度爲1000000的,內部存放隨機數值的數組的和
其中使用static inner class是爲了防止包可見致使命名衝突
import java.io.IOException; import java.util.Arrays; import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class T12_ForkJoinPool { static int[] nums = new int[1000000]; static final int MAX_NUM = 50000; static Random r = new Random(); static { for(int i=0; i<nums.length; i++) { nums[i] = r.nextInt(100); } System.out.println(Arrays.stream(nums).sum()); //stream api } /* static class AddTask extends RecursiveAction { int start, end; AddTask(int s, int e) { start = s; end = e; } @Override protected void compute() { if(end-start <= MAX_NUM) { long sum = 0L; for(int i=start; i<end; i++) sum += nums[i]; System.out.println("from:" + start + " to:" + end + " = " + sum); } else { int middle = start + (end-start)/2; AddTask subTask1 = new AddTask(start, middle); AddTask subTask2 = new AddTask(middle, end); subTask1.fork(); subTask2.fork(); } } } */ static class AddTask extends RecursiveTask<Long> { private static final long serialVersionUID = 1L; int start, end; AddTask(int s, int e) { start = s; end = e; } @Override protected Long compute() { if(end-start <= MAX_NUM) { long sum = 0L; for(int i=start; i<end; i++) sum += nums[i]; return sum; } int middle = start + (end-start)/2; AddTask subTask1 = new AddTask(start, middle); AddTask subTask2 = new AddTask(middle, end); subTask1.fork(); subTask2.fork(); return subTask1.join() + subTask2.join(); } } public static void main(String[] args) throws IOException { ForkJoinPool fjp = new ForkJoinPool(); AddTask task = new AddTask(0, nums.length); fjp.execute(task); long result = task.join(); System.out.println(result); //System.in.read(); } }
其實全部線程池的底層都是使用ThreadPoolExecutor做爲支撐的,能夠本身自定義線程池,指定不一樣的特定策略(最小/最大線程數、使用什麼任務隊列和執行策略等)
默認使用多線程並行計算
具體自查
import java.util.ArrayList; import java.util.List; import java.util.Random; public class T14_ParallelStreamAPI { public static void main(String[] args) { List<Integer> nums = new ArrayList<>(); Random r = new Random(); for(int i=0; i<10000; i++) nums.add(1000000 + r.nextInt(1000000)); //System.out.println(nums); long start = System.currentTimeMillis(); nums.forEach(v->isPrime(v)); long end = System.currentTimeMillis(); System.out.println(end - start); //使用parallel stream api start = System.currentTimeMillis(); nums.parallelStream().forEach(T14_ParallelStreamAPI::isPrime); end = System.currentTimeMillis(); System.out.println(end - start); } static boolean isPrime(int num) { for(int i=2; i<=num/2; i++) { if(num % i == 0) return false; } return true; } }
/** * 曾經的面試題:(淘寶?) * 實現一個容器,提供兩個方法,add,size * 寫兩個線程,線程1添加10個元素到容器中,線程2實現監控元素的個數,當個數到5個時,線程2給出提示並結束 * * 分析下面這個程序,能完成這個功能嗎? * @author mashibing */ import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; public class MyContainer1 { List lists = new ArrayList(); public void add(Object o) { lists.add(o); } public int size() { return lists.size(); } public static void main(String[] args) { MyContainer1 c = new MyContainer1(); new Thread(() -> { for(int i=0; i<10; i++) { c.add(new Object()); System.out.println("add " + i); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } }, "t1").start(); new Thread(() -> { while(true) { if(c.size() == 5) { break; } } System.out.println("t2 結束"); }, "t2").start(); } }
上述程序將不如咱們所願當size=5時break,而是不斷添加到10次仍不打印"t2 結束",且程序在打印10次"add"後仍然沒法退出
緣由是size對線程t2不可見(一直爲0),能夠在lists變量聲明volatile關鍵字解決
解法一
在lists變量聲明volatile關鍵字
但這種解法有幾個問題:
使用wait和notify方法解決
鎖對象調用wait和notify方法的做用
- wait方法:會讓當前線程進入等待,直到另外一個線程調用同一個對象的notify()或notifyAll()方法
- 調用wait方法時將放棄鎖/控制權
- notify方法:喚醒因調用這個對象wait()方法而阻塞的線程
- 調用notify方法時將不會放棄鎖/控制權
- 當執行notify方法時,會喚醒一個處於等待該對象鎖的線程,而後繼續往下執行,直到執行完退出對象鎖鎖住的區域(synchronized修飾的代碼塊)後再釋放鎖
- 故應該儘可能在線程調用notify/notifyAll()後,當即退出臨界區,即notify方法後面避免出現更多耗時的代碼
注意:
import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; public class MyContainer3 { //添加volatile,使t2可以獲得通知 volatile List lists = new ArrayList(); public void add(Object o) { lists.add(o); } public int size() { return lists.size(); } public static void main(String[] args) { MyContainer3 c = new MyContainer3(); final Object lock = new Object(); new Thread(() -> { synchronized(lock) { System.out.println("t2啓動"); if(c.size() != 5) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("t2 結束"); } }, "t2").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e1) { e1.printStackTrace(); } new Thread(() -> { System.out.println("t1啓動"); synchronized(lock) { for(int i=0; i<10; i++) { c.add(new Object()); System.out.println("add " + i); if(c.size() == 5) { lock.notify(); } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } }, "t1").start(); } }
使用Latch(門閂)替代wait和notify方法來進行通知,好處是通訊方式簡單,同時也能夠指定等待時間
CountDownLatch類位於java.util.concurrent包下,利用它能夠實現相似計數器的功能
好比有一個任務A,它要等待其餘4個任務執行完畢以後才能執行,此時就能夠利用CountDownLatch來實現這種功能了
其中使用await和countdown方法替代wait和notify方法
- CountDownLatch(int count) //構造一個用給定計數初始化的 CountDownLatch。
- void await() // 使當前線程在鎖存器倒計數至零以前一直等待,除非線程被中斷
- boolean await(long timeout, TimeUnit unit) // 使當前線程在鎖存器倒計數至零以前一直等待,除非線程被中斷或超出了指定的等待時間
- void countDown() // 遞減鎖存器的計數,若是計數到達零,則釋放全部等待的線程
- long getCount() // 返回當前計數
- String toString() // 返回標識此鎖存器及其狀態的字符串
CountDownLatch不涉及鎖定,當count的值爲零時當前線程繼續運行
簡言之,CountDownLatch不是鎖,而是一個對全部線程可見的、有令線程等待的功能的計數器(就像是一位嚴格的母親,要求兒子必須等5位大人所有動筷才能動筷,其中每位大人第一次動筷母親內心的計數器就減一)
這種方式不須要加鎖,性能比上面的解法要好,但我的疑問在是否會出現解法一中的t2執行時機不許確的弊端(經驗證,在t2結束代碼前使t2等待5秒時將出現這種弊端)
當不涉及同步,只涉及線程通訊的時候,用synchronized + wait/notify(加鎖)就顯得過重了,應該考慮使用CountDownLatch/CyclicBarrier/Semaphore代替
能夠查看https://www.cnblogs.com/dolphin0520/p/3920397.html
import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class MyContainer5 { // 添加volatile,使t2可以獲得通知 volatile List lists = new ArrayList(); public void add(Object o) { lists.add(o); } public int size() { return lists.size(); } public static void main(String[] args) { MyContainer5 c = new MyContainer5(); CountDownLatch latch = new CountDownLatch(1); new Thread(() -> { System.out.println("t2啓動"); if (c.size() != 5) { try { latch.await(); //也能夠指定等待時間 //latch.await(5000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("t2 結束"); }, "t2").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e1) { e1.printStackTrace(); } new Thread(() -> { System.out.println("t1啓動"); for (int i = 0; i < 10; i++) { c.add(new Object()); System.out.println("add " + i); if (c.size() == 5) { // 打開門閂,讓t2得以執行 latch.countDown(); } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } }, "t1").start(); } }
其中使用while而不是if來執行wait方法的緣由是:當使用if判斷時只會在阻塞前判斷一次,阻塞結束直接放行不做二次判斷,但此時若實際條件被其餘線程改變成應該再次阻塞,則該線程放行執行會出現錯誤(如容器原本已滿,if判斷爲容器已滿,阻塞put方法,若兩個生產者線程均進入到put方法阻塞,當容器變爲未滿狀態時喚醒兩個阻塞線程直接放行,某一個生產者線程操做使容器已滿,則put方法實際應該被阻塞,但if語句再也不判斷,後來執行的生產者線程繼續生產,從而使容器溢出發生錯誤)
注意:同步代碼塊/synchronized塊即便線程在代碼塊內被wait,喚醒後依然須要獲取鎖後才能繼續執行,不然繼續阻塞等待鎖
而while循環判斷這個條件,能夠解決這個問題
Effective Java一書中說明了wait方法絕大部分都是配合while來使用的
另外,使用notifyAll方法而不是notify方法的緣由是notify方法只能喚醒一個,可能喚醒的是同類線程(生產者喚醒生產者使得while判斷後兩個生產者均wait())使整個程序出現假死
Effective Java一書中也說明了應該永遠使用notifyAll方法,不使用notify方法
/** * 面試題:寫一個固定容量同步容器,擁有put和get方法,以及getCount方法, * 可以支持2個生產者線程以及10個消費者線程的阻塞調用 * * 使用wait和notify/notifyAll來實現 * * @author mashibing */ import java.util.LinkedList; import java.util.concurrent.TimeUnit; public class MyContainer1<T> { final private LinkedList<T> lists = new LinkedList<>(); final private int MAX = 10; //最多10個元素 private int count = 0; public synchronized void put(T t) { while(lists.size() == MAX) { //想一想爲何用while而不是用if? try { this.wait(); // effective java 放棄鎖,使得兩個生產者線程都可進入同步代碼塊執行到這一行,先獲取鎖的先往下執行,未獲取鎖的暫時等待 } catch (InterruptedException e) { e.printStackTrace(); } } lists.add(t); ++count; this.notifyAll(); //通知消費者線程進行消費 } public synchronized T get() { T t = null; while(lists.size() == 0) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } t = lists.removeFirst(); count --; this.notifyAll(); //通知生產者進行生產 return t; } public static void main(String[] args) { MyContainer1<String> c = new MyContainer1<>(); //啓動消費者線程 for(int i=0; i<10; i++) { new Thread(()->{ for(int j=0; j<5; j++) System.out.println(c.get()); }, "c" + i).start(); } try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } //啓動生產者線程 for(int i=0; i<2; i++) { new Thread(()->{ for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j); }, "p" + i).start(); } } }
使用ReentrantLock做爲鎖且配合Condition對象使用能夠精確喚醒/使等待具體的生產者/消費者線程
其中Condition是依靠「誰在這個方法執行到消費者.await方法來判斷誰是消費者」的,並不是直接指定哪一個線程是生產者/消費者(如get方法內某線程執行到了consumer.await()則這個線程就被認爲是消費者了)
線程的生產者/消費者之分是由線程內部執行什麼方法來定義的,並不是線程之間有所不同,全部線程都是同樣的,只是執行的方法不同而區分爲生產者和消費者
/** * 面試題:寫一個固定容量同步容器,擁有put和get方法,以及getCount方法, * 可以支持2個生產者線程以及10個消費者線程的阻塞調用 * * 使用wait和notify/notifyAll來實現 * * 使用Lock和Condition來實現 * 對比兩種方式,Condition的方式能夠更加精確的指定哪些線程被喚醒 * * @author mashibing */ import java.util.LinkedList; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class MyContainer2<T> { final private LinkedList<T> lists = new LinkedList<>(); final private int MAX = 10; //最多10個元素 private int count = 0; private Lock lock = new ReentrantLock(); private Condition producer = lock.newCondition(); private Condition consumer = lock.newCondition(); public void put(T t) { try { lock.lock(); while(lists.size() == MAX) { //想一想爲何用while而不是用if? producer.await(); } lists.add(t); ++count; consumer.signalAll(); //通知消費者線程進行消費 } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public T get() { T t = null; try { lock.lock(); while(lists.size() == 0) { consumer.await(); } t = lists.removeFirst(); count --; producer.signalAll(); //通知生產者進行生產 } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return t; } public static void main(String[] args) { MyContainer2<String> c = new MyContainer2<>(); //啓動消費者線程 for(int i=0; i<10; i++) { new Thread(()->{ for(int j=0; j<5; j++) System.out.println(c.get()); }, "c" + i).start(); } try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } //啓動生產者線程 for(int i=0; i<2; i++) { new Thread(()->{ for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j); }, "p" + i).start(); } } }