在[高併發Java 二] 多線程基礎中,咱們已經初步提到了基本的線程同步操做。此次要提到的是在併發包中的同步控制工具。java
ReentrantLock感受上是synchronized的加強版,synchronized的特色是使用簡單,一切交給JVM去處理,可是功能上是比較薄弱的。在JDK1.5以前,ReentrantLock的性能要好於synchronized,因爲對JVM進行了優化,如今的JDK版本中,二者性能是不相上下的。若是是簡單的實現,不要刻意去使用ReentrantLock。node
相比於synchronized,ReentrantLock在功能上更加豐富,它具備可重入、可中斷、可限時、公平鎖等特色。設計模式
首先咱們經過一個例子來講明ReentrantLock最初步的用法:安全
package test; import java.util.concurrent.locks.ReentrantLock; public class Test implements Runnable { public static ReentrantLock lock = new ReentrantLock(); public static int i = 0; @Override public void run() { for (int j = 0; j < 10000000; j++) { lock.lock(); try { i++; } finally { lock.unlock(); } } } public static void main(String[] args) throws InterruptedException { Test test = new Test(); Thread t1 = new Thread(test); Thread t2 = new Thread(test); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(i); } }
有兩個線程都對i進行++操做,爲了保證線程安全,使用了 ReentrantLock,從用法上能夠看出,與 synchronized相比, ReentrantLock就稍微複雜一點。由於必須在finally中進行解鎖操做,若是不在 finally解鎖,有可能代碼出現異常鎖沒被釋放,而synchronized是由JVM來釋放鎖。多線程
那麼ReentrantLock到底有哪些優秀的特色呢?併發
單線程能夠重複進入,但要重複退出ide
lock.lock(); lock.lock(); try { i++; } finally { lock.unlock(); lock.unlock(); }
因爲ReentrantLock是重入鎖,因此能夠反覆獲得相同的一把鎖,它有一個與鎖相關的獲取計數器,若是擁有鎖的某個線程再次獲得鎖,那麼獲取計數器就加1,而後鎖須要被釋放兩次才能得到真正釋放(重入鎖)。這模仿了 synchronized 的語義;若是線程進入由線程已經擁有的監控器保護的 synchronized 塊,就容許線程繼續進行,當線程退出第二個(或者後續) synchronized 塊的時候,不釋放鎖,只有線程退出它進入的監控器保護的第一個synchronized 塊時,才釋放鎖。函數
public class Child extends Father implements Runnable{ final static Child child = new Child();//爲了保證鎖惟一 public static void main(String[] args) { for (int i = 0; i < 50; i++) { new Thread(child).start(); } } public synchronized void doSomething() { System.out.println("1child.doSomething()"); doAnotherThing(); // 調用本身類中其餘的synchronized方法 } private synchronized void doAnotherThing() { super.doSomething(); // 調用父類的synchronized方法 System.out.println("3child.doAnotherThing()"); } @Override public void run() { child.doSomething(); } } class Father { public synchronized void doSomething() { System.out.println("2father.doSomething()"); } }
咱們能夠看到一個線程進入不一樣的 synchronized方法,是不會釋放以前獲得的鎖的。因此輸出仍是順序輸出。因此synchronized也是重入鎖高併發
輸出:工具
1child.doSomething() 2father.doSomething() 3child.doAnotherThing() 1child.doSomething() 2father.doSomething() 3child.doAnotherThing() 1child.doSomething() 2father.doSomething() 3child.doAnotherThing() ...
與synchronized不一樣的是,ReentrantLock對中斷是有響應的。中斷相關知識查看[高併發Java 二] 多線程基礎
普通的lock.lock()是不能響應中斷的,lock.lockInterruptibly()可以響應中斷。
咱們模擬出一個死鎖現場,而後用中斷來處理死鎖
package test; import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; import java.util.concurrent.locks.ReentrantLock; public class Test implements Runnable { public static ReentrantLock lock1 = new ReentrantLock(); public static ReentrantLock lock2 = new ReentrantLock(); int lock; public Test(int lock) { this.lock = lock; } @Override public void run() { try { if (lock == 1) { lock1.lockInterruptibly(); try { Thread.sleep(500); } catch (Exception e) { // TODO: handle exception } lock2.lockInterruptibly(); } else { lock2.lockInterruptibly(); try { Thread.sleep(500); } catch (Exception e) { // TODO: handle exception } lock1.lockInterruptibly(); } } catch (Exception e) { // TODO: handle exception } finally { if (lock1.isHeldByCurrentThread()) { lock1.unlock(); } if (lock2.isHeldByCurrentThread()) { lock2.unlock(); } System.out.println(Thread.currentThread().getId() + ":線程退出"); } } public static void main(String[] args) throws InterruptedException { Test t1 = new Test(1); Test t2 = new Test(2); Thread thread1 = new Thread(t1); Thread thread2 = new Thread(t2); thread1.start(); thread2.start(); Thread.sleep(1000); //DeadlockChecker.check(); } static class DeadlockChecker { private final static ThreadMXBean mbean = ManagementFactory .getThreadMXBean(); final static Runnable deadlockChecker = new Runnable() { @Override public void run() { // TODO Auto-generated method stub while (true) { long[] deadlockedThreadIds = mbean.findDeadlockedThreads(); if (deadlockedThreadIds != null) { ThreadInfo[] threadInfos = mbean.getThreadInfo(deadlockedThreadIds); for (Thread t : Thread.getAllStackTraces().keySet()) { for (int i = 0; i < threadInfos.length; i++) { if(t.getId() == threadInfos[i].getThreadId()) { t.interrupt(); } } } } try { Thread.sleep(5000); } catch (Exception e) { // TODO: handle exception } } } }; public static void check() { Thread t = new Thread(deadlockChecker); t.setDaemon(true); t.start(); } } }
上述代碼有可能會發生死鎖,線程1獲得lock1,線程2獲得lock2,而後彼此又想得到對方的鎖。
咱們用jstack查看運行上述代碼後的狀況
的確發現了一個死鎖。
DeadlockChecker.check();方法用來檢測死鎖,而後把死鎖的線程中斷。中斷後,線程正常退出。
超時不能得到鎖,就返回false,不會永久等待構成死鎖
使用lock.tryLock(long timeout, TimeUnit unit)來實現可限時鎖,參數爲時間和單位。
舉個例子來講明下可限時:
package test; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; public class Test implements Runnable { public static ReentrantLock lock = new ReentrantLock(); @Override public void run() { try { if (lock.tryLock(5, TimeUnit.SECONDS)) { Thread.sleep(6000); } else { System.out.println("get lock failed"); } } catch (Exception e) { } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } public static void main(String[] args) { Test t = new Test(); Thread t1 = new Thread(t); Thread t2 = new Thread(t); t1.start(); t2.start(); } }
使用兩個線程來爭奪一把鎖,當某個線程得到鎖後,sleep6秒,每一個線程都只嘗試5秒去得到鎖。
因此一定有一個線程沒法得到鎖。沒法得到後就直接退出了。
輸出:
get lock failed
使用方式:
public ReentrantLock(boolean fair) public static ReentrantLock fairLock = new ReentrantLock(true);
通常意義上的鎖是不公平的,不必定先來的線程能先獲得鎖,後來的線程就後獲得鎖。不公平的鎖可能會產生飢餓現象。
公平鎖的意思就是,這個鎖能保證線程是先來的先獲得鎖。雖然公平鎖不會產生飢餓現象,可是公平鎖的性能會比非公平鎖差不少。
Condition與ReentrantLock的關係就相似於synchronized與Object.wait()/signal()
await()方法會使當前線程等待,同時釋放當前鎖,當其餘線程中使用signal()時或者signalAll()方法時,線 程會從新得到鎖並繼續執行。或者當線程被中斷時,也能跳出等待。這和Object.wait()方法很類似。
awaitUninterruptibly()方法與await()方法基本相同,可是它並不會再等待過程當中響應中斷。 singal()方法用於喚醒一個在等待中的線程。相對的singalAll()方法會喚醒全部在等待中的線程。這和Obejct.notify()方法很相似。
這裏就再也不詳細介紹了。舉個例子來講明:
package test; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class Test implements Runnable { public static ReentrantLock lock = new ReentrantLock(); public static Condition condition = lock.newCondition(); @Override public void run() { try { lock.lock(); condition.await(); System.out.println("Thread is going on"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { Test t = new Test(); Thread thread = new Thread(t); thread.start(); Thread.sleep(2000); lock.lock(); condition.signal(); lock.unlock(); } }
上述例子很簡單,讓一個線程await住,讓主線程去喚醒它。condition.await()/signal只能在獲得鎖之後使用。
對於鎖來講,它是互斥的排他的。意思就是,只要我得到了鎖,沒人能再得到了。
而對於Semaphore來講,它容許多個線程同時進入臨界區。能夠認爲它是一個共享鎖,可是共享的額度是有限制的,額度用完了,其餘沒有拿到額度的線程仍是要阻塞在臨界區外。當額度爲1時,就相等於lock
下面舉個例子:
package test; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class Test implements Runnable { final Semaphore semaphore = new Semaphore(5); @Override public void run() { try { semaphore.acquire(); Thread.sleep(2000); System.out.println(Thread.currentThread().getId() + " done"); } catch (Exception e) { e.printStackTrace(); }finally { semaphore.release(); } } public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(20); final Test t = new Test(); for (int i = 0; i < 20; i++) { executorService.submit(t); } } }
有一個20個線程的線程池,每一個線程都去 Semaphore的許可,Semaphore的許可只有5個,運行後能夠看到,5個一批,一批一批地輸出。
固然一個線程也能夠一次申請多個許可
public void acquire(int permits) throws InterruptedException
ReadWriteLock是區分功能的鎖。讀和寫是兩種不一樣的功能,讀-讀不互斥,讀-寫互斥,寫-寫互斥。
這樣的設計是併發量提升了,又保證了數據安全。
使用方式:
private static ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock(); private static Lock readLock = readWriteLock.readLock(); private static Lock writeLock = readWriteLock.writeLock();
詳細例子能夠查看 Java實現生產者消費者問題與讀者寫者問題,這裏就不展開了。
倒數計時器
一種典型的場景就是火箭發射。在火箭發射前,爲了保證萬無一失,每每還要進行各項設備、儀器的檢查。 只有等全部檢查完畢後,引擎才能點火。這種場景就很是適合使用CountDownLatch。它可使得點火線程
,等待全部檢查線程所有完工後,再執行
使用方式:
static final CountDownLatch end = new CountDownLatch(10); end.countDown(); end.await();
示意圖:
一個簡單的例子:
package test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Test implements Runnable { static final CountDownLatch countDownLatch = new CountDownLatch(10); static final Test t = new Test(); @Override public void run() { try { Thread.sleep(2000); System.out.println("complete"); countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { executorService.execute(t); } countDownLatch.await(); System.out.println("end"); executorService.shutdown(); } }
主線程必須等待10個線程所有執行完纔會輸出"end"。
和CountDownLatch類似,也是等待某些線程都作完之後再執行。與CountDownLatch區別在於這個計數器能夠反覆使用。好比,假設咱們將計數器設置爲10。那麼湊齊第一批1 0個線程後,計數器就會歸零,而後接着湊齊下一批10個線程
使用方式:
public CyclicBarrier(int parties, Runnable barrierAction) barrierAction就是當計數器一次計數完成後,系統會執行的動做 await()
示意圖:
下面舉個例子:
package test; import java.util.concurrent.CyclicBarrier; public class Test implements Runnable { private String soldier; private final CyclicBarrier cyclic; public Test(String soldier, CyclicBarrier cyclic) { this.soldier = soldier; this.cyclic = cyclic; } @Override public void run() { try { //等待全部士兵到齊 cyclic.await(); dowork(); //等待全部士兵完成工做 cyclic.await(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } private void dowork() { // TODO Auto-generated method stub try { Thread.sleep(3000); } catch (Exception e) { // TODO: handle exception } System.out.println(soldier + ": done"); } public static class BarrierRun implements Runnable { boolean flag; int n; public BarrierRun(boolean flag, int n) { super(); this.flag = flag; this.n = n; } @Override public void run() { if (flag) { System.out.println(n + "個任務完成"); } else { System.out.println(n + "個集合完成"); flag = true; } } } public static void main(String[] args) { final int n = 10; Thread[] threads = new Thread[n]; boolean flag = false; CyclicBarrier barrier = new CyclicBarrier(n, new BarrierRun(flag, n)); System.out.println("集合"); for (int i = 0; i < n; i++) { System.out.println(i + "報道"); threads[i] = new Thread(new Test("士兵" + i, barrier)); threads[i].start(); } } }
打印結果:
集合 0報道 1報道 2報道 3報道 4報道 5報道 6報道 7報道 8報道 9報道 10個集合完成 士兵5: done 士兵7: done 士兵8: done 士兵3: done 士兵4: done 士兵1: done 士兵6: done 士兵2: done 士兵0: done 士兵9: done 10個任務完成
提供線程阻塞原語
和suspend相似
LockSupport.park(); LockSupport.unpark(t1);
與suspend相比 不容易引發線程凍結
LockSupport的思想呢,和 Semaphore有點類似,內部有一個許可,park的時候拿掉這個許可,unpark的時候申請這個許可。因此若是unpark在park以前,是不會發生線程凍結的。
下面的代碼是[高併發Java 二] 多線程基礎中suspend示例代碼,在使用suspend時會發生死鎖。
package test; import java.util.concurrent.locks.LockSupport; public class Test { static Object u = new Object(); static TestSuspendThread t1 = new TestSuspendThread("t1"); static TestSuspendThread t2 = new TestSuspendThread("t2"); public static class TestSuspendThread extends Thread { public TestSuspendThread(String name) { setName(name); } @Override public void run() { synchronized (u) { System.out.println("in " + getName()); //Thread.currentThread().suspend(); LockSupport.park(); } } } public static void main(String[] args) throws InterruptedException { t1.start(); Thread.sleep(100); t2.start(); // t1.resume(); // t2.resume(); LockSupport.unpark(t1); LockSupport.unpark(t2); t1.join(); t2.join(); } }
而使用 LockSupport則不會發生死鎖。
另外
park()可以響應中斷,但不拋出異常。中斷響應的結果是,park()函數的返回,能夠從Thread.interrupted()獲得中斷標誌。
在JDK當中有大量地方使用到了park,固然LockSupport的實現也是使用unsafe.park()來實現的。
public static void park() { unsafe.park(false, 0L); }
下面來介紹下ReentrantLock的實現,ReentrantLock的實現主要由3部分組成:
ReentrantLock的父類中會有一個state變量來表示同步的狀態
/** * The synchronization state. */ private volatile int state;
經過CAS操做來設置state來獲取鎖,若是設置成了1,則將鎖的持有者給當前線程
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
若是拿鎖不成功,則會作一個申請
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
首先,再去申請下試試看tryAcquire,由於此時可能另外一個線程已經釋放了鎖。
若是仍是沒有申請到鎖,就addWaiter,意思是把本身加到等待隊列中去
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
其間還會有屢次嘗試去申請鎖,若是仍是申請不到,就會被掛起
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
同理,若是在unlock操做中,就是釋放了鎖,而後unpark,這裏就不具體講了。
咱們知道HashMap不是一個線程安全的容器,最簡單的方式使HashMap變成線程安全就是使用Collections.synchronizedMap,它是對HashMap的一個包裝
public static Map m=Collections.synchronizedMap(new HashMap());
同理對於List,Set也提供了類似方法。
可是這種方式只適合於併發量比較小的狀況。
咱們來看下synchronizedMap的實現
private final Map<K,V> m; // Backing Map final Object mutex; // Object on which to synchronize SynchronizedMap(Map<K,V> m) { if (m==null) throw new NullPointerException(); this.m = m; mutex = this; } SynchronizedMap(Map<K,V> m, Object mutex) { this.m = m; this.mutex = mutex; } public int size() { synchronized (mutex) {return m.size();} } public boolean isEmpty() { synchronized (mutex) {return m.isEmpty();} } public boolean containsKey(Object key) { synchronized (mutex) {return m.containsKey(key);} } public boolean containsValue(Object value) { synchronized (mutex) {return m.containsValue(value);} } public V get(Object key) { synchronized (mutex) {return m.get(key);} } public V put(K key, V value) { synchronized (mutex) {return m.put(key, value);} } public V remove(Object key) { synchronized (mutex) {return m.remove(key);} } public void putAll(Map<? extends K, ? extends V> map) { synchronized (mutex) {m.putAll(map);} } public void clear() { synchronized (mutex) {m.clear();} }
它會將HashMap包裝在裏面,而後將HashMap的每一個操做都加上synchronized。
因爲每一個方法都是獲取同一把鎖(mutex),這就意味着,put和remove等操做是互斥的,大大減小了併發量。
下面來看下ConcurrentHashMap是如何實現的
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); }
在 ConcurrentHashMap內部有一個Segment段,它將大的HashMap切分紅若干個段(小的HashMap),而後讓數據在每一段上Hash,這樣多個線程在不一樣段上的Hash操做必定是線程安全的,因此只須要同步同一個段上的線程就能夠了,這樣實現了鎖的分離,大大增長了併發量。
在使用ConcurrentHashMap.size時會比較麻煩,由於它要統計每一個段的數據和,在這個時候,要把每個段都加上鎖,而後再作數據統計。這個就是把鎖分離後的小小弊端,可是size方法應該是不會被高頻率調用的方法。
在實現上,不使用synchronized和lock.lock而是儘可能使用trylock,同時在HashMap的實現上,也作了一點優化。這裏就不提了。
BlockingQueue不是一個高性能的容器。可是它是一個很是好的共享數據的容器。是典型的生產者和消費者的實現。
示意圖:
具體能夠查看Java實現生產者消費者問題與讀者寫者問題
系列: