Java併發教程-7高級併發對象

目前爲止,該教程重點講述了最初做爲Java平臺一部分的低級別API。這些API對於很是基本的任務來講已經足夠,可是對於更高級的任務就須要更高級的API。特別是針對充分利用了當今多處理器和多核系統的大規模併發應用程序。 本節,咱們將着眼於Java 5.0新增的一些高級併發特徵。大多數特徵已經在新的java.util.concurrent包中實現。Java集合框架中也定義了新的併發數據結構。 

  • 鎖對象提供了能夠簡化許多併發應用的鎖的慣用法。
  • Executors爲加載和管理線程定義了高級API。Executors的實現由java.util.concurrent包提供,提供了適合大規模應用的線程池管理。
  • 併發集合簡化了大型數據集合管理,且極大的減小了同步的需求。
  • 原子變量有減少同步粒度和避免內存一致性錯誤的特徵。
  • 併發隨機數(JDK7)提供了高效的多線程生成僞隨機數的方法。
1.  鎖對象  

同步代碼依賴於一種簡單的可重入鎖。這種鎖使用簡單,但也有諸多限制。 

java.util.concurrent.locks 包提供了更復雜的鎖。咱們不會詳細考察這個包,但會重點關注其最基本的接口,鎖。  鎖對象做用很是相似同步代碼使用的隱式鎖。如同隱式鎖,每次只有一個線程能夠得到鎖對象。經過關聯 Condition 對象,鎖對象也支持wait/notify機制。 鎖對象之於隱式鎖最大的優點在於,它們有能力收回得到鎖的嘗試。若是當前鎖對象不可用,或者鎖請求超時(若是超時時間已指定),tryLock方法會收回獲取鎖的請求。若是在鎖獲取前,另外一個線程發送了一箇中斷,lockInterruptibly方法也會收回獲取鎖的請求。 讓咱們使用鎖對象來解決咱們在 活躍度 中見到的死鎖問題。Alphonse和Gaston已經把本身訓練成能注意到朋友什麼時候要鞠躬。咱們經過要求Friend對象在雙方鞠躬前必須先得到鎖來模擬此次改善。下面是改善後模型的源代碼,Safelock。爲了展現其用途普遍,咱們假設Alphonse和Gaston對於他們新發現的穩定鞠躬的能力是如此入迷,以致於他們沒法不相互鞠躬。 

Java代碼 
  1. import java.util.concurrent.locks.Lock;  
  2. import java.util.concurrent.locks.ReentrantLock;  
  3. import java.util.Random;  
  4.   
  5. public class Safelock {  
  6.     static class Friend {  
  7.         private final String name;  
  8.         private final Lock lock = new ReentrantLock();  
  9.   
  10.         public Friend(String name) {  
  11.             this.name = name;  
  12.         }  
  13.   
  14.         public String getName() {  
  15.             return this.name;  
  16.         }  
  17.   
  18.         public boolean impendingBow(Friend bower) {  
  19.             Boolean myLock = false;  
  20.             Boolean yourLock = false;  
  21.             try {  
  22.                 myLock = lock.tryLock();  
  23.                 yourLock = bower.lock.tryLock();  
  24.             } finally {  
  25.                 if (! (myLock && yourLock)) {  
  26.                     if (myLock) {  
  27.                         lock.unlock();  
  28.                     }  
  29.                     if (yourLock) {  
  30.                         bower.lock.unlock();  
  31.                     }  
  32.                 }  
  33.             }  
  34.             return myLock && yourLock;  
  35.         }  
  36.   
  37.         public void bow(Friend bower) {  
  38.             if (impendingBow(bower)) {  
  39.                 try {  
  40.                     System.out.format("%s: %s has"  
  41.                         + " bowed to me!%n",  
  42.                         this.name, bower.getName());  
  43.                     bower.bowBack(this);  
  44.                 } finally {  
  45.                     lock.unlock();  
  46.                     bower.lock.unlock();  
  47.                 }  
  48.             } else {  
  49.                 System.out.format("%s: %s started"  
  50.                     + " to bow to me, but saw that"  
  51.                     + " I was already bowing to"  
  52.                     + " him.%n",  
  53.                     this.name, bower.getName());  
  54.             }  
  55.         }  
  56.   
  57.         public void bowBack(Friend bower) {  
  58.             System.out.format("%s: %s has" +  
  59.                 " bowed back to me!%n",  
  60.                 this.name, bower.getName());  
  61.         }  
  62. }  
  63.   
  64.     static class BowLoop implements Runnable {  
  65.         private Friend bower;  
  66.         private Friend bowee;  
  67.   
  68.         public BowLoop(Friend bower, Friend bowee) {  
  69.             this.bower = bower;  
  70.             this.bowee = bowee;  
  71.         }  
  72.   
  73.         public void run() {  
  74.             Random random = new Random();  
  75.             for (;;) {  
  76.                 try {  
  77.                     Thread.sleep(random.nextInt(10));  
  78.                 } catch (InterruptedException e) {}  
  79.                 bowee.bow(bower);  
  80.             }  
  81.         }  
  82.     }  
  83.   
  84.     public static void main(String[] args) {  
  85.         final Friend alphonse =  
  86.             new Friend("Alphonse");  
  87.         final Friend gaston =  
  88.             new Friend("Gaston");  
  89.         new Thread(new BowLoop(alphonse, gaston)).start();  
  90.         new Thread(new BowLoop(gaston, alphonse)).start();  
  91.     }  
  92. }  


2.  執行器(Executors)  

在以前全部的例子中,Thread對象表示的線程和Runnable對象表示的線程所執行的任務之間是緊耦合的。這對於小型應用程序來講沒問題,但對於大規模併發應用來講,合理的作法是將線程的建立與管理和程序的其餘部分分離開。封裝這些功能的對象就是執行器,接下來的部分將講詳細描述執行器。  

3.  Executor接口  

java.util.concurrent中包括三個Executor接口: 

  • Executor,一個運行新任務的簡單接口。
  • ExecutorService,擴展了Executor接口。添加了一些用來管理執行器生命週期和任務生命週期的方法。
  • ScheduledExecutorService,擴展了ExecutorService。支持Future和按期執行任務。
一般來講,指向Executor對象的變量應被聲明爲以上三種接口之一,而不是具體的實現類。 

Executor接口  

Executor 接口只有一個execute方法,用來替代一般建立(啓動)線程的方法。例如:r是一個Runnable對象,e是一個Executor對象。可使用 
Java代碼 
  1. e.execute(r);  

來代替 
Java代碼 
  1. (new Thread(r)).start();  

但execute方法沒有定義具體的實現方式。對於不一樣的Executor實現,execute方法多是建立一個新線程並當即啓動,但更有多是使用已有的工做線程運行r,或者將r放入到隊列中等待可用的工做線程。(咱們將在線程池一節中描述工做線程。) 

ExecutorService接口  

ExecutorService 接口在提供了execute方法的同時,新加了更加通用的submit方法。submit方法除了和execute方法同樣能夠接受Runnable對象做爲參數,還能夠接受Callable對象做爲參數。使用Callable對象能夠能使任務返還執行的結果。經過submit方法返回的Future對象能夠讀取Callable任務的執行結果,或是管理Callable任務和Runnable任務的狀態。 ExecutorService也提供了批量運行Callable任務的方法。最後,ExecutorService還提供了一些關閉執行器的方法。若是須要支持即時關閉,執行器所執行的任務須要正確處理中斷。 

ScheduledExecutorService接口  

ScheduledExecutorService 擴展ExecutorService接口並添加了schedule方法。調用schedule方法能夠在指定的延時後執行一個Runnable或者Callable任務。ScheduledExecutorService接口還定義了按照指定時間間隔按期執行任務的scheduleAtFixedRate方法和scheduleWithFixedDelay方法。 

4.  線程池  

在java.util.concurrent包中多數的執行器實現都使用了由工做線程組成的線程池,工做線程獨立於所它所執行的Runnable任務和Callable任務,而且經常使用來執行多個任務。 使用工做線程可使建立線程的開銷最小化。 

在大規模併發應用中,建立大量的Thread對象會佔用佔用大量系統內存,分配和回收這些對象會產生很大的開銷。一種最多見的線程池是固定大小的線程池。這種線程池始終有必定數量的線程在運行,若是一個線程因爲某種緣由終止運行了,線程池會自動建立一個新的線程來代替它。須要執行的任務經過一個內部隊列提交給線程,當沒有更多的工做線程能夠用來執行任務時,隊列保存額外的任務。 使用固定大小的線程池一個很重要的好處是能夠實現優雅退化。例如一個Web服務器,每個HTTP請求都是由一個單獨的線程來處理的,若是爲每個HTTP都建立一個新線程,那麼當系統的開銷超出其能力時,會忽然地對全部請求都中止響應。若是限制Web服務器能夠建立的線程數量,那麼它就沒必要當即處理全部收到的請求,而是在有能力處理請求時才處理。 建立一個使用線程池的執行器最簡單的方法是調用 java.util.concurrent.Executors newFixedThreadPool 方法。Executors類還提供了下列一下方法: 

  • newCachedThreadPool方法建立了一個可擴展的線程池。適合用來啓動不少短任務的應用程序。
  • newSingleThreadExecutor方法建立了每次執行一個任務的執行器。
  • 還有一些建立ScheduledExecutorService執行器的方法。
若是上面的方法都不知足須要,能夠嘗試 java.util.concurrent.ThreadPoolExecutor 或者 java.util.concurrent.ScheduledThreadPoolExecutor 。 

5.  Fork/Joint  

fork/join框架是ExecutorService接口的一種具體實現,目的是爲了幫助你更好地利用多處理器帶來的好處。它是爲那些可以被遞歸地拆解成子任務的工做類型量身設計的。其目的在於可以使用全部可用的運算能力來提高你的應用的性能。   相似於ExecutorService接口的其餘實現,fork/join框架會將任務分發給線程池中的工做線程。fork/join框架的獨特之處在與它使用工做竊取(work-stealing)算法。完成本身的工做而處於空閒的工做線程可以從其餘仍然處於忙碌(busy)狀態的工做線程處竊取等待執行的任務。 fork/join框架的核心是 ForkJoinPool 類,它是對AbstractExecutorService類的擴展。ForkJoinPool實現了工做偷取算法,並能夠執行 ForkJoinTask 任務。 

基本使用方法  

使用fork/join框架的第一步是編寫執行一部分工做的代碼。你的代碼結構看起來應該與下面所示的僞代碼相似: 

Java代碼 
  1. if (當前這個任務工做量足夠小)  
  2.     直接完成這個任務  
  3. else  
  4.     將這個任務或這部分工做分解成兩個部分  
  5.     分別觸發(invoke)這兩個子任務的執行,並等待結果  


你須要將這段代碼包裹在一個ForkJoinTask的子類中。不過,一般狀況下會使用一種更爲具體的的類型,或者是 RecursiveTask (會返回一個結果),或者是 RecursiveAction 。 當你的ForkJoinTask子類準備好了,建立一個表明全部須要完成工做的對象,而後將其做爲參數傳遞給一個ForkJoinPool實例的invoke()方法便可。 

要清晰,先模糊  

想要了解fork/join框架的基本工做原理,接下來的這個例子會有所幫助。假設你想要模糊一張圖片。原始的source圖片由一個整數的數組表示,每一個整數表示一個像素點的顏色數值。與source圖片相同,模糊以後的destination圖片也由一個整數數組表示。 對圖片的模糊操做是經過對source數組中的每個像素點進行處理完成的。處理的過程是這樣的:將每一個像素點的色值取出,與周圍像素的色值(紅、黃、藍三個組成部分)放在一塊兒取平均值,獲得的結果被放入destination數組。由於一張圖片會由一個很大的數組來表示,這個流程會花費一段較長的時間。若是使用fork/join框架來實現這個模糊算法,你就可以藉助多處理器系統的並行處理能力。下面是上述算法結合fork/join框架的一種簡單實現: 

Java代碼 
  1. public class ForkBlur extends RecursiveAction {  
  2. private int[] mSource;  
  3. private int mStart;  
  4. private int mLength;  
  5. private int[] mDestination;  
  6.   
  7. // Processing window size; should be odd.  
  8. private int mBlurWidth = 15;  
  9.   
  10. public ForkBlur(int[] src, int start, int length, int[] dst) {  
  11.     mSource = src;  
  12.     mStart = start;  
  13.     mLength = length;  
  14.     mDestination = dst;  
  15. }  
  16.   
  17. protected void computeDirectly() {  
  18.     int sidePixels = (mBlurWidth - 1) / 2;  
  19.     for (int index = mStart; index < mStart + mLength; index++) {  
  20.         // Calculate average.  
  21.         float rt = 0, gt = 0, bt = 0;  
  22.         for (int mi = -sidePixels; mi <= sidePixels; mi++) {  
  23.             int mindex = Math.min(Math.max(mi + index, 0),  
  24.                                 mSource.length - 1);  
  25.             int pixel = mSource[mindex];  
  26.             rt += (float)((pixel & 0x00ff0000) >> 16)  
  27.                   / mBlurWidth;  
  28.             gt += (float)((pixel & 0x0000ff00) >>  8)  
  29.                   / mBlurWidth;  
  30.             bt += (float)((pixel & 0x000000ff) >>  0)  
  31.                   / mBlurWidth;  
  32.         }  
  33.   
  34.         // Reassemble destination pixel.  
  35.         int dpixel = (0xff000000     ) |  
  36.                (((int)rt) << 16) |  
  37.                (((int)gt) <<  8) |  
  38.                (((int)bt) <<  0);  
  39.         mDestination[index] = dpixel;  
  40.     }  
  41. }  


接下來你須要實現父類中的compute()方法,它會直接執行模糊處理,或者將當前的工做拆分紅兩個更小的任務。數組的長度能夠做爲一個簡單的閥值來判斷任務是應該直接完成仍是應該被拆分。 

Java代碼 
  1. protected static int sThreshold = 100000;  
  2.   
  3. protected void compute() {  
  4.     if (mLength < sThreshold) {  
  5.         computeDirectly();  
  6.         return;  
  7.     }  
  8.   
  9.     int split = mLength / 2;  
  10.   
  11.     invokeAll(new ForkBlur(mSource, mStart, split, mDestination),  
  12.               new ForkBlur(mSource, mStart + split, mLength - split,  
  13.                            mDestination));  
  14. }  


若是前面這個方法是在一個RecursiveAction的子類中,那麼設置任務在ForkJoinPool中執行就再直觀不過了。一般會包含如下一些步驟: 

(1) 建立一個表示全部須要完成工做的任務。 

Java代碼 
  1. // source image pixels are in src  
  2. // destination image pixels are in dst  
  3. ForkBlur fb = new ForkBlur(src, 0, src.length, dst);  


(2) 建立將要用來執行任務的ForkJoinPool。 

Java代碼 
  1. ForkJoinPool pool = new ForkJoinPool();  


(3) 執行任務。 

Java代碼 
  1. pool.invoke(fb);  


想要瀏覽完成的源代碼,請查看 ForkBlur ,其中還包含一些建立destination圖片文件的額外代碼。 

標準實現  

除了可以使用fork/join框架來實現可以在多處理系統中被並行執行的定製化算法(如前文中的ForkBlur.java例子),在Java SE中一些比較經常使用的功能點也已經使用fork/join框架來實現了。在Java SE 8中,java.util.Arrays類的一系列parallelSort()方法就使用了fork/join來實現。這些方法與sort()系列方法很相似,可是經過使用fork/join框架,藉助了併發來完成相關工做。在多處理器系統中,對大數組的並行排序會比串行排序更快。這些方法到底是如何運用fork/join框架並不在本教程的討論範圍內。想要了解更多的信息,請參見Java API文檔。 其餘採用了fork/join框架的方法還包括java.util.streams包中的一些方法,此包是做爲Java SE 8發行版中 Project Lambda 的一部分。想要了解更多信息,請參見 Lambda Expressions 一節。 

6.  併發集合  

java.util.concurrent包囊括了Java集合框架的一些附加類。它們也最容易按照集合類所提供的接口來進行分類: 

  • BlockingQueue定義了一個先進先出的數據結構,當你嘗試往滿隊列中添加元素,或者從空隊列中獲取元素時,將會阻塞或者超時。
  • ConcurrentMapjava.util.Map的子接口,定義了一些有用的原子操做。移除或者替換鍵值對的操做只有當key存在時才能進行,而新增操做只有當key不存在時。使這些操做原子化,能夠避免同步。ConcurrentMap的標準實現是ConcurrentHashMap,它是HashMap的併發模式。
  • ConcurrentNavigableMap是ConcurrentMap的子接口,支持近似匹配。ConcurrentNavigableMap的標準實現是ConcurrentSkipListMap,它是TreeMap的併發模式。
  • 全部這些集合,經過 在集合裏新增對象和訪問或移除對象的操做之間,定義一個happens-before的關係,來幫助程序員避免內存一致性錯誤

7.  原子變量  

java.util.concurrent.atomic 包定義了對單一變量進行原子操做的類。全部的類都提供了get和set方法,可使用它們像讀寫volatile變量同樣讀寫原子類。就是說,同一變量上的一個set操做對於任意後續的get操做存在happens-before關係。原子的compareAndSet方法也有內存一致性特色,就像應用到整型原子變量中的簡單原子算法。   爲了看看這個包如何使用,讓咱們返回到最初用於演示線程干擾的 Counter 類: 

Java代碼 
  1. class Counter {  
  2.     private int c = 0;  
  3.     public void increment() {  
  4.         c++;  
  5.     }  
  6.   
  7.     public void decrement() {  
  8.         c--;  
  9.     }  
  10.   
  11.     public int value() {  
  12.         return c;  
  13.     }  
  14. }  


使用同步是一種使Counter類變得線程安全的方法,如 SynchronizedCounter : 

Java代碼 
  1. class SynchronizedCounter {  
  2. private int c = 0;  
  3. public synchronized void increment() {  
  4. c++;  
  5. }  
  6. public synchronized void decrement() {  
  7. c--;  
  8. }  
  9. public synchronized int value() {  
  10. return c;  
  11. }  
  12. }  


對於這個簡單的類,同步是一種可接受的解決方案。可是對於更復雜的類,咱們可能想要避免沒必要要同步所帶來的活躍度影響。將int替換爲AtomicInteger容許咱們在不進行同步的狀況下阻止線程干擾,如 AtomicCounter : 

Java代碼 
  1. import java.util.concurrent.atomic.AtomicInteger;  
  2. class AtomicCounter {  
  3. private AtomicInteger c = new AtomicInteger(0);  
  4. public void increment() {  
  5. c.incrementAndGet();  
  6. }  
  7.   
  8. public void decrement() {  
  9. c.decrementAndGet();  
  10. }  
  11.   
  12. public int value() {  
  13. return c.get();  
  14. }  


8.  併發隨機數  

在JDK7中,java.util.concurrent包含了一個至關便利的類,ThreadLocalRandom,當應用程序指望在多個線程或ForkJoinTasks中使用隨機數時。 

對於併發訪問,使用TheadLocalRandom代替Math.random()能夠減小競爭,從而得到更好的性能。 

你只需調用ThreadLocalRandom.current(), 而後調用它的其中一個方法去獲取一個隨機數便可。下面是一個例子: 

Java代碼 
  1. int r = ThreadLocalRandom.current().nextInt(4,77);  
相關文章
相關標籤/搜索