勿止於結論;持續探索與求證。html
爲何要使用併發 ? 有三點足夠信服的理由:java
不過,併發使用姿式不當,很容易出錯,致使難以估量的損失。可謂是一把雙刃劍。面試
最近,團隊有同窗踩了併發的坑。我想,要不梳理下併發的一些陷阱及相關原理和解決方案吧,以備後用。
算法
並非在全部狀況下都須要使用併發。Java 多線程模型,在併發執行的時候會有線程建立、切換、阻塞、調度的開銷、內存同步的開銷等。編程
在單核 CPU 上,運行 CPU 密集型計算,並不須要使用併發,由於 CPU 自己就是飽和的,使用併發只能帶來沒必要要的線程切換和同步開銷。 在多核 CPU 處理器上,運行 CPU 密集型計算,線程數應該與 CPU 個數同樣,以便於將工做合理分配到每一個 CPU 上,很少很多。 若是線程數大於 CPU 核數,那麼就會有沒必要要的線程切換;若是線程數小於 CPU 核數,就沒法充分利用全部的核。緩存
實際應用中,經常是 RPC 調用和純計算業務邏輯處理的交替執行,也就是 IO 密集型,或者 IO 和 CPU 密集型交叉的任務,則須要線程數遠遠大於 CPU 核數,來避免 線程等待 IO 操做完成以前的無所事事。安全
要討論併發問題,首先要理解,何爲線程安全的 ? 詳解可參閱《Java併發編程實戰》的第二章。多線程
線程安全,是指在多線程執行環境下,併發執行的結果與串行執行的結果始終一致。 這句話有兩層意思: 1. 不會由於線程執行順序不肯定,致使不肯定的結果;2. 多線程併發執行的結果,應該與多線程串行執行的結果一致。它們的差異僅僅體如今速度上,而不是結果上。併發
在具體措施上,表現爲多線程對「含有共享可變狀態的對象」的訪問控制與同步。這裏有兩個前提: 1. 多線程。 單線程執行環境是線程安全的; 2. 共享可變。 不可變的對象是線程安全的;沒有任何寫操做的共享可變對象是線程安全的; 3. 無狀態的對象是線程安全的。異步
多線程環境下,保證併發安全的若干理念:
從一開始設計成線程安全的類,比在之後將類修改爲線程安全的類,要更容易和安全得多。由於線程不安全的類,在實際業務系統中可能已經在各類場景下使用到了,修改爲線程安全的類,會致使性能降低,產生不可預知的後果。
程序狀態的封裝性越好,就越容易實現線程安全的訪問,也更容易維護。
優先考慮使用現有的線程安全的類和同步工具類。
使用不可變量和無狀態對象(一般是應用中的全局無狀態組件)。
不共享變量,好比儘可能使用方法內的局部變量,或者聲明組件爲原型模式。
規定哪些操做組合必須符合原子性,並藉助同步和鎖來實現組合操做的原子性。
以下代碼一所示。先起一個線程,將 isReady 設置爲 true ,而後再進入循環,判斷 isReady 是否爲 true ,爲 true 則退出。
這段代碼是線程安全的嗎? 如何判斷 ? 不妨假設這兩個線程按照代碼順序串行執行。那麼,打印 ready! 以後,不該該有 not ready 的打印。
代碼一:
public class NoVisibility { private boolean isReady = false; public void ready() { isReady = true; } public boolean isReady() { return isReady; } static class NoVisibilityTester { public static void main(String[]args) { NoVisibility noVisibility = new NoVisibility(); new Thread(() -> { noVisibility.ready(); System.out.println(System.nanoTime() + ": ready!"); }).start(); while(true) { if (noVisibility.isReady()) { System.out.println(System.nanoTime() + ": main thread now is ready"); break; } System.out.println(System.nanoTime() + ": not ready"); } System.out.println(System.nanoTime() + ": now exit"); } } }
運行屢次,獲得以下結果。打印 ready! 以後,還有 not ready 的輸出。這是爲何呢?
167774951548450: ready! 167774951549923: not ready 167774951822008: main thread now is ready 167774951857190: now exit
這裏涉及到併發讀寫的最最基本的陷阱:可見性。根據 「Jvm內存模型深度理解」,當線程更新 isReady 以後,只是寫到本身的線程緩存裏,並無當即刷新到主內存中。那麼主線程須要等待一段時間,才能檢測到主內存的 isReady 已經變化了。
若是不打算使用鎖的話,能夠加上可見性修飾符:volatile 。 volatile 會當即將更新的線程緩存值刷新到主內存中,使得全部訪問該共享變量的線程都能當即感知到新的值。volatile 是一個極輕量級的同步機制,經常使用於判斷標誌位是否更新。可是 volatile 並不適合作同步鎖(基於 volatile 的同步是很脆弱的)。
一個含有未受保護的實例變量的對象,在多線程環境中訪問是不安全的。這大概是關於併發陷阱的最經典的栗子。
如代碼二所示:一個 UnSafeObject 含有一個實例變量 i 。在 main 中,建立了 3 個線程,分別會設置 i,而後休眠 200ms ,再獲取 i 。
這是線程安全的嗎 ? 按照上述定義來看,若是併發與串行執行結果一致,那麼應該是:每一個線程都會拿到與本身線程號對應的值。至少不會拿到其餘的線程號。
代碼二:
@Setter @Getter public class UnSafeObject { private int i = 0; public static void main(String[] args) { UnSafeObject unSafeObject = new UnSafeObject(); ThreadStarter.startMultiThreads( (ti) -> { unSafeObject.setI(ti); try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { } System.out.println("Thread" + ti + ":" + unSafeObject.getI()); } ); } }
打印結果以下:很明顯,每一個線程拿到的值並不必定是它本身設置的。由於在多線程環境下,i 可能被任何一個線程所修改。
Thread2:2 Thread1:2 Thread0:2 Thread2:0 Thread1:0
致使對象 UnSafeObject 的實例變量 i 在多線程環境下訪問不安全的緣由是:JVM 併發機制是基於共享內存模型的。可閱:「Jvm內存模型深度理解」。 這篇文章講得詳細。
怎樣才能將 UnSafeObject 變成線程安全的呢 ? 最簡便的方式是,將 i 聲明爲原子的 AutoInteger。 當只須要一個單變量的原子操做時,使用原子類。
原子類採用的是基於硬件能力提供的 CAS ,能夠安全替代 volatile 的使用。在競爭適度(如何衡量?)的狀況下, CAS 可以提供更好的性能和可伸縮性。 CAS 有個「ABA」的問題,能夠經過增長一個版本號來解決。 CAS 是非阻塞算法,是樂觀鎖的實現方式。在實際系統中也常常會用到,好比 DB 的樂觀鎖,ES 版本控制等。經過問題轉換,將併發修改的範圍映射到原子變量的修改上,能夠拓展非阻塞併發的使用範圍。
詳情可閱:《併發編程實戰》的第十五章。
對於這樣一個簡單例子,你們耳熟能詳。不過,換到真實環境裏,還能看出來麼? 以下代碼三所示。這段代碼有什麼問題呢? LightTcOrderFormat 是一個含有實例變量 tcOrder 的 Component。 咋一看,確實沒啥問題。可是,若是放在多線程環境裏跑一跑,tcOrder 就會被隨意篡改。
代碼三:
@Slf4j @Component public class LightTcOrderFormat extends LightTcOrderDTO implements LightFormat { private TcOrder tcOrder; @Override public LightResultDTO format(WholeOrderSet wholeOrderSet, LightResultDTO lightResultDTO) { // code ... this.tcOrder = wholeOrderSet.getTcOrder(); LightTcOrderDTO lightTcOrderDTO = new LightTcOrderDTO(); BeanUtils.copyProperties(wholeOrderSet.getTcOrder(), this); BeanUtils.copyProperties(this, lightTcOrderDTO); lightResultDTO.setOrder(lightTcOrderDTO); return lightResultDTO; } }
避免措施: LightTcOrderFormat 聲明爲原型模式:@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) ,成爲不共享的對象。
以下代碼四所示,使用了 ConcurrentHashMap 對 map 中的 [key,value] 進行保護。
輸出 final 不必定等於 300000。爲何會這樣呢? 雖然 get 與 put 是原子操做,可是組合成一個 add 方法, add 方法是非原子化的。兩個線程徹底可能,同時執行 get("key") = 5 ; 而後前後 put("key", 6) ,使得最終值爲 6, 而不是 7。
代碼四:
import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class UnatomicOperation { private Map<String, Integer> map = new ConcurrentHashMap<>(); public void add(String key) { Integer value = map.get(key); if(value == null) { map.put(key, 1); } else { map.put(key, value + 1); } } public void nonSafeAdd(String key) { map.put(key, map.putIfAbsent(key, 1)+1); } public Integer get(String key) { return map.get(key); } public static void main(String[] args) { UnatomicOperation unatomicOperation = new UnatomicOperation(); ThreadStarter.startMultiThreads( (ti) -> { unatomicOperation.nonSafeAdd("key"); System.out.println(ti + ":" + unatomicOperation.get("key")); } ); System.out.println("final: " + unatomicOperation.get("key")); } }
有小夥伴問:那麼寫成這樣能否 ? 看看 putIfAbsent 的實現,就知道也是不能夠的。由於後者只是比前者表達更加簡潔,但效果是一致的。
public void nonSafeAdd(String key) { map.put(key, map.putIfAbsent(key, 1)+1); }
一系列原子操做組合後的複合操做,若是不具備原子化,也會有線程不安全的問題。
一種解決方案是,對對象的全部須要併發訪問的方法使用 synchronized 關鍵字修飾。若是方法裏的操做耗時都比較平均,不存在耗時很大的操做,這種方法最經濟。詳可閱:「深刻理解 Synchronized」
同步鎖,本質是封閉思想的一種體現。將對共享可變量的訪問,限制在指定的同步方法或由鎖構建的臨界區中。
代碼五:
public class SychronizedOperation { private Map<String, Integer> map = new HashMap<>(); public synchronized void add(String key) { Integer value = map.get(key); if(value == null) { map.put(key, 1); } else { map.put(key, value + 1); } } public synchronized Integer get(String key) { return map.get(key); } public static void main(String[] args) { SychronizedOperation sychronizedOperation = new SychronizedOperation(); ThreadStarter.startMultiThreads( (ti) -> { sychronizedOperation.add("key"); System.out.println(ti + ":" + sychronizedOperation.get("key")); } ); System.out.println("final: " + sychronizedOperation.get("key")); } }
問題:讀操做也要加 synchronized 嗎? 爲何 ?
雖然使用 synchronized 解決了問題,可是稍有不當,會帶來性能問題。 這裏加在方法上,實際上就是對整個 map 加鎖,而 ConcurrentHashMap 是有分段鎖優化的,這樣就將分段鎖優化的優點給去掉了。 那麼,如何在保存 ConcurrentHashMap 的優點基礎上,安全地訪問 key 呢?
這裏實際上設計兩層鎖:1. 給 key 加鎖,是分段的; 2. 給計數加鎖。 這裏能夠將初始化的部分抽離出來單獨加鎖。以下代碼六所示。使用 ConcurrentHashMap + AtomicLong 強強聯合,來解決這個問題。ConcurrentHashMap 給 key 加分段鎖,AtomicLong 給訪問同一個 key 的 value 加鎖;還有一個給 value 爲空時的初始化加鎖。
代碼六:
public class ConcurrentCombinedOperation { private Map<String, AtomicLong> map = new ConcurrentHashMap<>(); private Lock lock = new ReentrantLock(); public long addAndGet(String key) { if(lock.tryLock()) { try { map.putIfAbsent(key, new AtomicLong()); } finally { lock.unlock(); } } return map.get(key).incrementAndGet(); } public long get(String key) { return map.get(key).get(); } }
如上代碼所示,雖然將 get-put 中的 put 解放出來了,可是依然有兩個不足:
所以,還須要進一步進行優化。仔細思考可知,實際上只須要初始化(key 對應的 value 爲空)的時候加鎖便可。如代碼七所示,使用 DCL 來安全初始化 key 對應的 AtomicLong 對象。
代碼七:
public long addAndGetEffective(String key) { init(key); return map.get(key).incrementAndGet(); } private void init(String key) { if (map.get(key) == null) { synchronized (key) { if (map.get(key) == null) { map.put(key, new AtomicLong()); } } } }
問題:以上代碼涉及到 key 的監控對象鎖和分段鎖,是否會出現死鎖問題 ?
上述同步代碼中,分別使用了 synchronized 和 ReentrantLock 。那麼,它們有什麼異同?如何在二者之間進行選擇呢? 如下是一些建議:
原子操做組合的非原子化,還表如今一種經常使用狀況:關聯不一致性。也就是說,兩個變量的變化,必須符合某種一致性規約。好比正方形的邊長與面積,就是同步變化的。以下代碼八所示。 最終輸出狀況,square 與 area 不必定會知足 area = square * square 的關係。
代碼八:
public class RelatedInconsistency { private int square; private int area; public RelatedInconsistency(int square) { this.square = square; computeArea(); } public void set(int square) { this.square = square; computeArea(); System.out.println("square: " + square + " area:" + area); } private void computeArea() { this.area = this.square * this.square; } static class RelatedInconsistencyTest { public static void main(String[]args) throws InterruptedException { RelatedInconsistency relatedInconsistency = new RelatedInconsistency(1); ThreadStarter.startMultiThreads(3000, 10, (th) -> relatedInconsistency.set(th) ); } } }
以下代碼九所示。你能看出問題所在嗎 ? add 也加了 synchronized 關鍵字。 看上去貌似是沒有問題的。
代碼九:
public class EscapedObject { private List<Integer> nums = new ArrayList<>(); private synchronized void add(Integer num) { if (num != null) { nums.add(num); } } public List<Integer> getNums() { return nums; } static class EscapedObjectTester { public static void main(String[] args) throws InterruptedException { EscapedObject escapedObject = new EscapedObject(); escapedObject.add(5); List<Integer> escaped = escapedObject.getNums(); ThreadStarter.startMultiThreads(3, 3, (ti) -> { escaped.add(ti*ti); System.out.println(ti + ":" + escaped); } ); } } }
輸出結果以下。將線程數調大,拋出異常 ConcurrentModificationException 的機率會更大。
1:[5, 0, 1] 0:[5, 0, 1] 1:[5, 0, 1, 1] 0:[5, 0, 1, 1, 0] 1:[5, 0, 1, 1, 0, 4, 1] 0:[5, 0, 1, 1, 0, 4, 1, 0] Exception in thread "Thread-2" java.util.ConcurrentModificationException at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
爲何還會拋併發修改的異常呢 ? getNums 闖了禍。這個方法將不安全的 nums 暴露出去了。 換個角度說, nums 經過 getNums 這個方法逃逸出去了。 這樣 nums 就可能被多個線程同時更改了。
解決方案:1. 不對外暴漏這個實例變量,僅可經過指定方法訪問(封閉的思想);2. 若是須要獲取這個 nums ,使它變成不可變的。不容許逃逸出去的對象被修改。這實際上遵循了「不可變量老是線程安全的」原理。
public List<Integer> getImmutableNums() { return Collections.unmodifiableList(nums); }
事實上,即便返回不可變的 List, List 裏的對象依然是線程不安全的。由於 List 的逸出,連帶着將 List 裏的對象也逸出了。 所以,對於容器的併發,避免將整個容器都返回出去。
這個問題恐怕是不多人會特別注意到的陷阱。在調用頻繁的實例方法中建立線程池,會致使建立線程數不受控地增加,最終致使應用崩潰。
以下代碼十所示。在局部方法 freqCalledMethod 不斷建立新的局部線程池。只要方法調用足夠次數,就會致使應用崩潰。
代碼十:
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class UncontrolledLocalThreadPool { public static void main(String[] args) { int n = 100000; for (int i=0; i< n; i++) { freqCalledMethod(); } System.out.println("here"); } public static void freqCalledMethod() { ExecutorService threadExecutor = Executors.newFixedThreadPool(10); for (int i=0; i< 10; i++) { threadExecutor.submit(() -> 9999L * 9999L); } } }
報:
Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
報這個錯誤的緣由是超出了JVM容許建立的最大線程數。
The "java.lang.OutOfMemoryError: unable to create new native thread" is thrown by the JVM whenever it hit the limit of how many threads it can create. The limit is imposed by the operating system.
線程池本來是用來使得應用中建立的線程數是可控的,結果線程池的建立變得不可控了,顯然也會致使線程數不可控。
解決方案:切忌在大量頻繁調用的實例方法裏建立線程池。建立配置良好的全局線程池。
能夠經過以下程序來測試機器上的最大可建立線程數。TimeUnit.SECONDS.sleep(1000); 是爲了避免讓線程過快的退出。
代碼十一:
import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class ThreadMaxCount { private static AtomicInteger count = new AtomicInteger(); public static void main(String[] args) throws InterruptedException { while (true) { new Thread(() -> { try { TimeUnit.SECONDS.sleep(1000); } catch (InterruptedException e) { } }).start(); System.out.println("thread num:" + count.incrementAndGet()); TimeUnit.MILLISECONDS.sleep(10); } } }
若是你有一個比較長的業務鏈路,有一些公共數據要在整個鏈路中傳遞。要麼,將公共數據放在方法中逐層傳遞下去,要麼建立一個 ThreadLocal 來保存這些公共數據,在鏈路裏傳遞。
ThreadLocal 是線程本地副本,每一個線程有本身私有的一份數據。經過不共享的思路去避免併發修改問題。不過 ThreadLocal 若是與線程池結合使用,就會有問題。
代碼十二:
public class ThreadLocalLeak { private ThreadLocal<Integer> context = new ThreadLocal(); public ThreadLocalLeak(Integer initValue) { context.set(initValue); } public Integer get() { return context.get(); } public void set(Integer initValue) { context.set(initValue); } public void clear() { context.remove(); } public static void main(String[] args) throws InterruptedException { ThreadLocalLeak threadLocalLeak = new ThreadLocalLeak(5); ExecutorService executor = Executors.newFixedThreadPool(10); executor.execute( () -> { for (int i=0; i<=100000; i++) { System.out.println(System.nanoTime() + " set before:" + Thread.currentThread() + ": " + threadLocalLeak.get()); threadLocalLeak.set(i); System.out.println(System.nanoTime() + " set after:" + Thread.currentThread() + ": " + threadLocalLeak.get()); //threadLocalLeak.clear(); } } ); executor.shutdown(); executor.awaitTermination(3000, TimeUnit.SECONDS); } }
輸出結果以下:
要理解 ThreadLocal 的實現,關鍵是理解 Thread 中含有一個 ThreadLocal.ThreadLocalMap 對象。這個對象實際上就是線程的本地副本。 之因此不直接用 Object, 是由於要實現兩個目標: 1. 安全。 使用泛型來存取副本對象,編寫代碼更加安全,避免強制類型轉換; 2. 須要存儲多個值。ThreadLocalMap 的 Key 是一個 WeakReference[ThreadLocal] ,ThreadLocal 經過 AtomicInteger 實現了 hashCode 的約定,並提供了方法來獲取當前執行線程的本地副本的值。
當線程在線程池中被複用時,執行下一次任務時,就可能拿到上一次任務執行後的殘留數據了。
解決方案:在線程執行完任務後,將 ThrealLocal 中的內容清空。
爲了測試併發陷阱,須要啓動多線程去執行任務。爲避免寫重複代碼,須要先寫個通用的多線程啓動代碼。如代碼十三所示。這段代碼使用了 t.join 方法來同步線程之間的活動,使主線程必須在全部子線程執行以後才退出。 這樣作並無多大問題。不過,使用比較底層的 API 比使用成熟的同步工具類,會更有風險。
代碼十三:
import java.util.ArrayList; import java.util.List; import java.util.function.Consumer; public class ThreadStarter { public static void startMultiThreads(Consumer<Integer> consumer) { try { startMultiThreads(3, 100000, consumer); } catch (InterruptedException e) { System.err.println("error: " + e.getMessage()); } } public static void startMultiThreads(int threadNum, int times, Consumer<Integer> consumer) throws InterruptedException { List<Thread> threadList = new ArrayList<>(); for (int t=0; t < threadNum; t++) { int threadIndex = t; Thread th = new Thread(() -> { for (int i=0; i < times; i++) { consumer.accept(threadIndex); } }); threadList.add(th); th.start(); } for (Thread t: threadList) { t.join(); } } }
這裏須要一個「柵欄」:當全部線程都到達這個柵欄的時候,才觸發後續的活動。這其實是個通用的功能。使用 CountDownLatch 工具來實現更佳。
如代碼十四所示。啓動線程引用了 CountDownLatch 對象。當線程執行完成退出時,就將 CountDownLatch 計數減一。當 CountDownLatch 計數爲 0 時,就會釋放柵欄,讓等待的主線程經過。
代碼十四:
public static void startMultiThreadsV2(int threadNum, int times, Consumer<Integer> consumer) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(threadNum); for (int i=0; i<threadNum; i++) { new Thread( new Worker(countDownLatch, consumer, i, times), "t"+i ).start(); } countDownLatch.await(); } static class Worker implements Runnable { private CountDownLatch countDownLatch; private Consumer consumer; private int threadIndex; private int times; public Worker(CountDownLatch countDownLatch, Consumer consumer, int threadIndex, int times) { this.countDownLatch = countDownLatch; this.consumer = consumer; this.threadIndex = threadIndex; this.times = times; } @Override public void run() { for (int i=0; i < times; i++) { consumer.accept(threadIndex); } countDownLatch.countDown(); } }
Java 併發的這些陷阱,從根本上去追溯,都是由共享內存模型所帶來的。若是換用基於消息投遞的方式,好比 「混合使用ForkJoin+Actor+Future實現一千萬個不重複整數的排序(Scala示例)」 , 天然就不存在這些問題了,固然,消息投遞又會帶來新的問題:好比消息接收不到,消息延遲,處理的異步化,反直覺的編程模型等。
從根因上去探索,換一種思路和作法,看到的空間更爲廣闊。
本文主要梳理了併發的一些陷阱及相關原理和解決方案。要編寫正確的併發程序,須要很是仔細才行。使用不共享、不可變、可見性修飾、封閉訪問、加鎖、同步等機制來保證線程安全性。