在通常性開發中,筆者常常看到不少同窗在對待java併發開發模型中只會使用一些基礎的方法。好比volatile,synchronized。像Lock和atomic這類高級併發包不少人並不常用。我想大部分緣由都是來之於對原理的不屬性致使的。在繁忙的開發工做中,又有誰會很準確的把握和使用正確的併發模型呢?html
因此最近基於這個思想,本人打算把併發控制機制這部分整理成一篇文章。既是對本身掌握知識的一個回憶,也是但願這篇講到的類容能幫助到大部分開發者。java
並行程序開發不可避免地要涉及多線程、多任務的協做和數據共享等問題。在JDK中,提供了多種途徑實現多線程間的併發控制。好比經常使用的:內部鎖、重入鎖、讀寫鎖和信號量。node
在java中,每個線程有一塊工做內存區,其中存放着被全部線程共享的主內存中的變量的值的拷貝。當線程執行時,它在本身的工做內存中操做這些變量。算法
爲了存取一個共享的變量,一個線程一般先獲取鎖定而且清除它的工做內存區,這保證該共享變量從全部線程的共享內存區正確地裝入到線程的工做內存區,當線程解鎖時保證該工做內存區中變量的值協會到共享內存中。sql
當一個線程使用某一個變量時,不論程序是否正確地使用線程同步操做,它獲取的值必定是由它自己或者其餘線程存儲到變量中的值。例如,若是兩個線程把不一樣的值或者對象引用存儲到同一個共享變量中,那麼該變量的值要麼是這個線程的,要麼是那個線程的,共享變量的值不會是由兩個線程的引用值組合而成。編程
一個變量時Java程序能夠存取的一個地址,它不只包括基本類型變量、引用類型變量,並且還包括數組類型變量。保存在主內存區的變量能夠被全部線程共享,可是一個線程存取另外一個線程的參數或者局部變量時不可能的,因此開發人員沒必要擔憂局部變量的線程安全問題。至於內存模型中線程工做內存與主內存的交互請關注http://www.cnblogs.com/nexiyi/p/java_memory_model_and_thread.html , 這裏就再也不作過多介紹。數組
因爲每一個線程都有本身的工做內存區,所以當一個線程改變本身的工做內存中的數據時,對其餘線程來講,多是不可見的。爲此,可使用volatile關鍵字破事全部線程軍讀寫內存中的變量,從而使得volatile變量在多線程間可見。安全
聲明爲volatile的變量能夠作到以下保證:多線程
一、其餘線程對變量的修改,能夠及時反應在當前線程中;併發
二、確保當前線程對volatile變量的修改,能及時寫回到共享內存中,並被其餘線程所見;
三、使用volatile聲明的變量,編譯器會保證其有序性。
同步關鍵字synchronized是Java語言中最爲經常使用的同步方法之一。在JDK早期版本中,synchronized的性能並非太好,值適合於鎖競爭不是特別激烈的場合。在JDK6中,synchronized和非公平
鎖的差距已經縮小。更爲重要的是,synchronized更爲簡潔明瞭,代碼可讀性和維護性比較好。
鎖定一個對象的方法:
public synchronized void method(){}
當method()方法被調用時,調用線程首先必須得到當前對象所,若當前對象鎖被其餘線程持有,這調用線程會等待,犯法結束後,對象鎖會被釋放,以上方法等價於下面的寫法:
public void method(){ synchronized(this){ // do something … } }
其次,使用synchronized還能夠構造同步塊,與同步方法相比,同步塊能夠更爲精確控制同步代碼範圍。一個小的同步代碼很是有離與鎖的快進快出,從而使系統擁有更高的吞吐量。
public void method(Object o){ // before synchronized(o){ // do something ... } // after }
synchronized也能夠用於static函數:
public synchronized static void method(){}
這個地方必定要注意,synchronized的鎖是加在 當前Class對象 上,所以,全部對該方法的調用,都必須得到Class對象的鎖。
雖然synchronized能夠保證對象或者代碼段的線程安全,可是僅使用synchronized仍是不足以控制擁有複雜邏輯的線程交互。爲了實現多線程間的交互,還須要使用Object對象的wait()和notify()方法。
典型用法:
synchronized(obj){ while(<?>){ obj.wait(); // 收到通知後,繼續執行。 } }
在使用wait()方法前,須要得到對象鎖。在wait()方法執行時,當前線程或釋放obj的獨佔鎖,供其餘線程使用。
當等待在obj上線程收到obj.notify()時,它就能從新得到obj的獨佔鎖,並繼續運行。注意了,notify()方法是 隨機喚起 等待在當前對象的某一個線程。
下面是一個阻塞隊列的實現:
public class BlockQueue{ private List list = new ArrayList(); public synchronized Object pop() throws InterruptedException{ while (list.size()==0){ this.wait(); } if (list.size()>0){ return list.remove(0); } else{ return null; } } public synchronized Object put(Object obj){ list.add(obj); this.notify(); } }
synchronized配合wait()、notify()應該是Java開發者必須掌握的基本技能。
Reentrantlock稱爲重入鎖。它比synchronized擁有更增強大的功能,它能夠中斷、可定時。在高併發的狀況下,它比synchronized有明顯的性能優點。
Reentrantlock提供了公平和非公平兩種鎖。公平鎖是對鎖的獲取是先進先出,而非公平鎖是能夠插隊的。固然從性能上分析,非公平鎖的性能要好得多。所以,在無特殊須要,應該優選非公平鎖,可是synchronized提供鎖業不是絕對公平的。Reentrantlock在構造的時候能夠指定鎖是否公平。
在使用重入鎖時,必定要在程序最後釋放鎖。通常釋放鎖的代碼要寫在finally裏。不然,若是程序出現異常,Loack就永遠沒法釋放了。synchronized的鎖是JVM最後自動釋放的。
經典使用方式以下:
try { if (lock.tryLock(5, TimeUnit.SECONDS)) { //若是已經被lock,嘗試等待5s,看是否能夠得到鎖,若是5s後仍然沒法得到鎖則返回false繼續執行 // lock.lockInterruptibly();能夠響應中斷事件 try { //操做 } finally { lock.unlock(); } } } catch (InterruptedException e) { e.printStackTrace(); //當前線程被中斷時(interrupt),會拋InterruptedException }
Reentrantlock提供了很是豐富的鎖控制功能,靈活應用這些控制方法,能夠提升應用程序的性能。不過這裏並不是是極力推薦使用Reentrantlock。重入鎖算是JDK中提供的高級開發工具。這裏有一篇文章專門針對 ReentrantLock和synchronized兩種鎖定機制的對比 。
讀寫分離是一種很是常見的數據處理思想。在sql中應該算是必須用到的技術。ReadWriteLock是在JDK5中提供的讀寫分離鎖。讀寫分離鎖能夠有效地幫助減小鎖競爭,以提高系統性能。讀寫分離使用場景主要是若是在系統中,讀操做次數遠遠大於寫操做。使用方式以下:
private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private Lock readLock = readWriteLock.readLock(); private Lock writeLock = readWriteLock.writeLock(); public Object handleRead() throws InterruptedException { try { readLock.lock(); Thread.sleep(1000); return value; }finally{ readLock.unlock(); } } public Object handleRead() throws InterruptedException { try { writeLock.lock(); Thread.sleep(1000); return value; }finally{ writeLock.unlock(); } }
Conditiond對象用於協調多線程間的複雜協做。主要與鎖相關聯。經過Lock接口中的newCondition()方法能夠生成一個與Lock綁定的Condition實例。Condition對象和鎖的關係就如用Object.wait()、Object.notify()兩個函數以及synchronized關鍵字同樣。
這裏能夠把ArrayBlockingQueue的源碼摘出來看一下:
public class ArrayBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable { /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); // 生成與Lock綁定的Condition notFull = lock.newCondition(); } public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } } private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); // 通知 } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) // 若是隊列爲空 notEmpty.await(); // 則消費者隊列要等待一個非空的信號 return extract(); } finally { lock.unlock(); } } private E extract() { final Object[] items = this.items; E x = this.<E>cast(items[takeIndex]); items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); // 通知put() 線程隊列已有空閒空間 return x; } // other code }
信號量爲多線程協做提供了更爲強大的控制方法。信號量是對鎖的擴展。不管是內部鎖synchronized仍是重入鎖ReentrantLock,一次都容許一個線程訪問一個資源,而信號量卻能夠指定多個線程同時訪問某一個資源。從構造函數能夠看出:
public Semaphore(int permits) {} public Semaphore(int permits, boolean fair){} // 能夠指定是否公平
permits指定了信號量的准入書,也就是同時能申請多少個許可。當每一個線程每次只申請一個許可時,這就至關於指定了同時有多少個線程能夠訪問某一個資源。這裏羅列一下主要方法的使用:
下面來看一下JDK文檔中提供使用信號量的實例。這個實例很好的解釋瞭如何經過信號量控制資源訪問。
public class Pool { private static final int MAX_AVAILABLE = 100; private final Semaphore available = new Semaphore(MAX_AVAILABLE, true); public Object getItem() throws InterruptedException { available.acquire(); // 申請一個許可 // 同時只能有100個線程進入取得可用項, // 超過100個則須要等待 return getNextAvailableItem(); } public void putItem(Object x) { // 將給定項放回池內,標記爲未被使用 if (markAsUnused(x)) { available.release(); // 新增了一個可用項,釋放一個許可,請求資源的線程被激活一個 } } // 僅做示例參考,非真實數據 protected Object[] items = new Object[MAX_AVAILABLE]; // 用於對象池複用對象 protected boolean[] used = new boolean[MAX_AVAILABLE]; // 標記做用 protected synchronized Object getNextAvailableItem() { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (!used[i]) { used[i] = true; return items[i]; } } return null; } protected synchronized boolean markAsUnused(Object item) { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (item == items[i]) { if (used[i]) { used[i] = false; return true; } else { return false; } } } return false; } }
此實例簡單實現了一個對象池,對象池最大容量爲100。所以,當同時有100個對象請求時,對象池就會出現資源短缺,未能得到資源的線程就須要等待。當某個線程使用對象完畢後,就須要將對象返回給對象池。此時,因爲可用資源增長,所以,能夠激活一個等待該資源的線程。
在剛開始接觸ThreadLocal,筆者很難理解這個線程局部變量的使用場景。當如今回過頭去看,ThreadLocal是一種多線程間併發訪問變量的解決方案。與synchronized等加鎖的方式不一樣,ThreadLocal徹底不提供鎖,而使用了以空間換時間的手段,爲每一個線程提供變量的獨立副本,以保障線程安全,所以它不是一種數據共享的解決方案。
ThreadLocal是解決線程安全問題一個很好的思路,ThreadLocal類中有一個Map,用於存儲每個線程的變量副本,Map中元素的鍵爲線程對象,而值對應線程的變量副本,因爲Key值不可重複,每個「線程對象」對應線程的「變量副本」,而到達了線程安全。
特別值得注意的地方,從性能上說,ThreadLocal並不具備絕對的又是,在併發量不是很高時,也行加鎖的性能會更好。但做爲一套與鎖徹底無關的線程安全解決方案,在高併發量或者所競爭激烈的場合,使用ThreadLocal能夠在必定程度上減小鎖競爭。
下面是一個ThreadLocal的簡單使用:
public class TestNum { // 經過匿名內部類覆蓋ThreadLocal的initialValue()方法,指定初始值 private static ThreadLocal seqNum = new ThreadLocal() { public Integer initialValue() { return 0; } }; // 獲取下一個序列值 public int getNextNum() { seqNum.set(seqNum.get() + 1); return seqNum.get(); }public static void main(String[] args) { TestNum sn = new TestNum(); //3個線程共享sn,各自產生序列號 TestClient t1 = new TestClient(sn); TestClient t2 = new TestClient(sn); TestClient t3 = new TestClient(sn); t1.start(); t2.start(); t3.start(); } private static class TestClient extends Thread { private TestNum sn; public TestClient(TestNum sn) { this.sn = sn; } public void run() { for (int i = 0; i < 3; i++) { // 每一個線程打出3個序列值 System.out.println("thread[" + Thread.currentThread().getName() + "] --> sn[" + sn.getNextNum() + "]"); } } } }
輸出結果:
thread[Thread-0] –> sn[1] thread[Thread-1] –> sn[1] thread[Thread-2] –> sn[1] thread[Thread-1] –> sn[2] thread[Thread-0] –> sn[2] thread[Thread-1] –> sn[3] thread[Thread-2] –> sn[2] thread[Thread-0] –> sn[3] thread[Thread-2] –> sn[3]
輸出的結果信息能夠發現每一個線程所產生的序號雖然都共享同一個TestNum實例,但它們並無發生相互干擾的狀況,而是各自產生獨立的序列號,這是由於ThreadLocal爲每個線程提供了單獨的副本。
「鎖」是最經常使用的同步方法之一。在日常開發中,常常能看到不少同窗直接把鎖加很大一段代碼上。還有的同窗只會用一種鎖方式解決全部共享問題。顯然這樣的編碼是讓人沒法接受的。特別的在高併發的環境下,激烈的鎖競爭會致使程序的性能降低德更加明顯。所以合理使用鎖對程序的性能直接相關。
一、線程的開銷
在多核狀況下,使用多線程能夠明顯提升系統的性能。可是在實際狀況中,使用多線程的方式會額外增長系統的開銷。相對於單核系統任務自己的資源消耗外,多線程應用還須要維護額外多線程特有的信息。好比,線程自己的元數據,線程調度,線程上下文的切換等。
二、減少鎖持有時間
在使用鎖進行併發控制的程序中,當鎖發生競爭時,單個線程對鎖的持有時間與系統性能有着直接的關係。若是線程持有鎖的時間很長,那麼相對地,鎖的競爭程度也就越激烈。所以,在程序開發過程當中,應該儘量地減小對某個鎖的佔有時間,以減小線程間互斥的可能。好比下面這一段代碼:
public synchronized void syncMehod(){ beforeMethod(); mutexMethod(); afterMethod(); }
此實例若是隻有mutexMethod()方法是有同步須要的,而在beforeMethod(),和afterMethod()並不須要作同步控制。若是beforeMethod(),和afterMethod()分別是重量級的方法,則會花費較長的CPU時間。在這個時候,若是併發量較大時,使用這種同步方案會致使等待線程大量增長。由於當前執行的線程只有在執行完全部任務後,纔會釋放鎖。
下面是優化後的方案,只在必要的時候進行同步,這樣就能明顯減小線程持有鎖的時間,提升系統的吞吐量。代碼以下:
public void syncMehod(){ beforeMethod(); synchronized(this){ mutexMethod(); } afterMethod(); }
三、減小鎖粒度
減少鎖粒度也是一種削弱多線程鎖競爭的一種有效手段,這種技術典型的使用場景就是ConcurrentHashMap這個類。在普通的HashMap中每當對集合進行add()操做或者get()操做時,老是得到集合對象的鎖。這種操做徹底是一種同步行爲,由於鎖是在整個集合對象上的,所以,在高併發時,激烈的鎖競爭會影響到系統的吞吐量。
若是看過源碼的同窗應該知道HashMap是數組+鏈表的方式作實現的。ConcurrentHashMap在HashMap的基礎上將整個HashMap分紅若干個段(Segment),每一個段都是一個子HashMap。若是須要在增長一個新的表項,並非將這個HashMap加鎖,二十搜線根據hashcode獲得該表項應該被存放在哪一個段中,而後對該段加鎖,並完成put()操做。這樣,在多線程環境中,若是多個線程同時進行寫入操做,只要被寫入的項不存在同一個段中,那麼線程間即可以作到真正的並行。具體的實現但願讀者本身花點時間讀一讀ConcurrentHashMap這個類的源碼,這裏就再也不作過多描述了。
四、鎖分離
在前面提起過ReadWriteLock讀寫鎖,那麼讀寫分離的延伸就是鎖的分離。一樣能夠在JDK中找到鎖分離的源碼LinkedBlockingQueue。
public class LinkedBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable { /* Lock held by take, poll, etc / private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition(); public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); // 不能有兩個線程同時讀取數據 try { while (count.get() == 0) { // 若是當前沒有可用數據,一直等待put()的通知 notEmpty.await(); } x = dequeue(); // 從頭部移除一項 c = count.getAndDecrement(); // size減1 if (c > 1) notEmpty.signal(); // 通知其餘take()操做 } finally { takeLock.unlock(); // 釋放鎖 } if (c == capacity) signalNotFull(); // 通知put()操做,已有空餘空間 return x; } public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); // 不能有兩個線程同時put數據 try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) { // 隊列滿了 則等待 notFull.await(); } enqueue(node); // 加入隊列 c = count.getAndIncrement();// size加1 if (c + 1 < capacity) notFull.signal(); // 若是有足夠空間,通知其餘線程 } finally { putLock.unlock();// 釋放鎖 } if (c == 0) signalNotEmpty();// 插入成功後,通知take()操做讀取數據 } // other code }
這裏須要說明一下的就是,take()和put()函數是相互獨立的,它們之間不存在鎖競爭關係。只須要在take()和put()各自方法內部分別對takeLock和putLock發生競爭。從而,削弱了鎖競爭的可能性。
五、鎖粗化
上面說到的減少鎖時間和粒度,這樣作就是爲了知足每一個線程持有鎖的時間儘可能短。可是,在粒度上應該把握一個度,若是對用一個鎖不停地進行請求、同步和釋放,其自己也會消耗系統寶貴的資源,反而加大了系統開銷。
咱們須要知道的是,虛擬機在遇到一連串連續的對同一鎖不斷進行請求和釋放的操做時,便會把全部的鎖操做整合成對鎖的一次請求,從而減小對鎖的請求同步次數,這樣的操做叫作鎖的粗化。下面是一段整合實例演示:
public void syncMehod(){ synchronized(lock){ method1(); } synchronized(lock){ method2(); } }
JVM整合後的形式:
public void syncMehod(){ synchronized(lock){ method1(); method2(); } }
所以,這樣的整合給咱們開發人員對鎖粒度的把握給出了很好的演示做用。
上面花了很大篇幅在說鎖的事情,同時也提到過鎖是會帶來必定的上下文切換的額外資源開銷,在高併發時,」鎖「的激烈競爭可能會成爲系統瓶頸。所以,這裏可使用一種非阻塞同步方法。這種無鎖方式依然能保證數據和程序在高併發環境下保持多線程間的一致性。
一、非阻塞同步/無鎖
非阻塞同步方式其實在前面的ThreadLocal中已經有所體現,每一個線程擁有各自獨立的變量副本,所以在並行計算時,無需相互等待。這裏筆者主要推薦一種更爲重要的、基於比較並交換(Compare And Swap)CAS算法的無鎖併發控制方法。
CAS算法的過程:它包含3個參數CAS(V,E,N)。V表示要更新的變量,E表示預期值,N表示新值。僅當V值等於E值時,纔會將V的值設爲N,若是V值和E值不一樣,則說明已經有其餘線程作了更新,則當前線程什麼都不作。最後CAS返回當前V的真實值。CAS操做時抱着樂觀的態度進行的,它老是認爲本身能夠成功完成操做。當多個線程同時使用CAS操做一個變量時,只有一個會勝出,併成功更新,其他俊輝失敗。失敗的線程不會被掛起,僅是被告知失敗,而且容許再次嘗試,固然也容許失敗的線程放棄操做。基於這樣的原理,CAS操做及時沒有鎖,也能夠發現其餘線程對當前線程的干擾,而且進行恰當的處理。
在這裏筆者引入一篇講得很不錯的文章 非阻塞同步算法與CAS(Compare and Swap)無鎖算法 。
二、原子量操做
JDK的java.util.concurrent.atomic包提供了使用無鎖算法實現的原子操做類,代碼內部主要使用了底層native代碼的實現。有興趣的同窗能夠繼續跟蹤一下native層面的代碼。這裏就不貼表層的代碼實現了。
下面主要以一個例子來展現普通同步方法和無鎖同步的性能差距:
public class TestAtomic { private static final int MAX_THREADS = 3; private static final int TASK_COUNT = 3; private static final int TARGET_COUNT = 100 * 10000; private AtomicInteger acount = new AtomicInteger(0); private int count = 0; synchronized int inc() { return ++count; } synchronized int getCount() { return count; } public class SyncThread implements Runnable { String name; long startTime; TestAtomic out; public SyncThread(TestAtomic o, long startTime) { this.out = o; this.startTime = startTime; } @Override public void run() { int v = out.inc(); while (v < TARGET_COUNT) { v = out.inc(); } long endTime = System.currentTimeMillis(); System.out.println("SyncThread spend:" + (endTime - startTime) + "ms" + ", v=" + v); } } public class AtomicThread implements Runnable { String name; long startTime; public AtomicThread(long startTime) { this.startTime = startTime; } @Override public void run() { int v = acount.incrementAndGet(); while (v < TARGET_COUNT) { v = acount.incrementAndGet(); } long endTime = System.currentTimeMillis(); System.out.println("AtomicThread spend:" + (endTime - startTime) + "ms" + ", v=" + v); } } @Test public void testSync() throws InterruptedException { ExecutorService exe = Executors.newFixedThreadPool(MAX_THREADS); long startTime = System.currentTimeMillis(); SyncThread sync = new SyncThread(this, startTime); for (int i = 0; i < TASK_COUNT; i++) { exe.submit(sync); } Thread.sleep(10000); } @Test public void testAtomic() throws InterruptedException { ExecutorService exe = Executors.newFixedThreadPool(MAX_THREADS); long startTime = System.currentTimeMillis(); AtomicThread atomic = new AtomicThread(startTime); for (int i = 0; i < TASK_COUNT; i++) { exe.submit(atomic); } Thread.sleep(10000); } }
測試結果以下:
testSync():
SyncThread spend:201ms, v=1000002 SyncThread spend:201ms, v=1000000 SyncThread spend:201ms, v=1000001 testAtomic(): AtomicThread spend:43ms, v=1000000 AtomicThread spend:44ms, v=1000001 AtomicThread spend:46ms, v=1000002
相信這樣的測試結果將內部鎖和非阻塞同步算法的性能差別體現的很是明顯。所以筆者更推薦直接視同atomic下的這個原子類。
終於把想表達的這些東西整理完成了,其實還有一些想CountDownLatch這樣的類沒有講到。不過上面的所講到的絕對是併發編程中的核心。也許有些讀者朋友能在網上看到不少這樣的知識點,可是我的仍是以爲知識只有在對比的基礎上才能找到它合適的使用場景。所以,這也是筆者整理這篇文章的緣由,也但願這篇文章能幫到更多的同窗。