【Java併發.5】基礎構建模塊

  本章會介紹一些最有用的併發構建模塊,有丶東西(最後一小節,純乾貨)。算法

5.1  同步容器類設計模式

  同步容器類包括 Vector 和 Hashtable ,這些類實現線程安全的方式是:將它們的狀態封裝起來,並對每一個公有方法都進行同步,使得每次只有一個線程能訪問容器的狀態。數組

5.1.1  同步容器類的問題緩存

  同步容器類都是線程安全的,但在某些狀況下可能須要額外的客戶端加鎖來保護符合操做。容器上常見的複合操做包括:迭代(反覆訪問元素,直到遍歷完容器中全部元素)、跳轉(根據指定順序找到當前元素的下一個元素)以及條件運算,例如「若沒有則添加」(檢查在Map 中是否存在鍵值 K,若是沒有,就加入二元組(K,V))。在同步線程中,當其餘線程併發地修改容器時,它們可能會出現出乎預料以外的行爲。安全

  程序清單 5-1: Vector 上可能致使混亂結果的複合操做服務器

  public static Object getLast(Vector list) {
        int lastIndex = list.size() - 1;   //1:  與 2 同步得到lastIndex 
        return list.get(lastIndex);
    }
    public static void dropLast(Vector list) {
        int lastIndex = list.size() - 1;    //2:  與 1 同步得到lastIndex 
        list.remove(lastIndex);
    }

  這些方法看似沒問題,可是當出現同步時,同時調用 list.get(lastIndex)  和 list.remove(lastIndex) ,可能 get(lastIndex)將拋出 ArrayIndexOutIfBoundsException多線程

  因爲同步容器要遵照同步策略,支持客戶端加鎖, 程序清單 5-2: 在使用客戶端加鎖的 Vector 上的複合操做併發

  public static Object getLast(Vector list) {
        synchronized (list) {
            int lastIndex = list.size() - 1; 
            return list.get(lastIndex);
        }
    }
    public static void dropLast(Vector list) {
        synchronized (list) {
            int lastIndex = list.size() - 1;   
            list.remove(lastIndex);
        }
    }

  在調用 size() 和相應的 get() 之間,Vector 的長度可能會發生變化,這種風險在對 Vector 中元素進行迭代時仍然會出現,如程序清單 5-3:可能拋出 ArrayIndexOutIfBoundsException 的迭代操做框架

    for (int i =  0; i < vector.size(); i++) {
            doSometing(vector.get(i));
       }

  對此,咱們可能須要犧牲一些伸縮性,經過在迭代期間持有 Vector 的鎖,能夠防止其餘線程在迭代期間修改 Vectordom

    synchronized (vector) {
            for (int i = 0; i < vector.size(); i++) {
                doSometing(vector.get(i));
            }
        }

  

5.1.2  迭代器與 ConcurrentModificationException

  設計同步容器類的迭代器時並無考慮到併發修改的問題,而且它們表現出的行爲是「及時失敗」(fail-fast)的。這意味着,當它們發現容器在迭代過程當中被修改時,就會拋出一個 ConcurrentModificationException 異常。

  想要避免出現 ConcurrentModificationException 異常,就必須在迭代過程持有容器的鎖。  程序清單 5-5: 經過 Iterator 來迭代 List

    List<Widget> widgetList = Collections.synchronizedList(new ArrayList<Widget>);
        ......
        //可能拋出 ConcurrentModificationException
        for (Widget w: widgetList) {
            doSomething(w);
        }

  然而,開發人員並不但願在迭代期間對容器加鎖,若是容器規模很大,或者在每一個元素上執行操做的時間很長,那麼這些線程將長時間等待。

  若是不但願在迭代期間對容器加鎖,那麼一種替代方法就是「克隆」容器,並在副本上進行迭代。(相似於 ThreadLocal)

 

5.1.3  隱藏迭代器

  雖然加鎖能夠防止迭代器拋出異常,但你必需要記住在全部對共享容器進行迭代的地方都須要加鎖。然而實際狀況要更加複雜,在某些狀況下,迭代器會隱藏起來。如程序清單 5-6: 隱藏在字符串鏈接中的迭代操做

public class HiddenIterator {
    private final Set<Integer> set = new HashSet<>();
    public synchronized void add(Integer i) {
        set.add(i);
    }
    public synchronized void remove(Integer i) {
        set.remove(i);
    }
    public void addTenThings() {
        Random r = new Random();
        for (int i = 0; i < 10; i++) {
            add(r.nextInt());
        }
        System.out.println("Debug: added ten element to " + set);  // set.toString() 會對容器進行迭代
    }
}

  這裏獲得的教訓是,若是狀態與保護它的同步代碼之間相隔越遠,那麼開發人員就越容易忘記在訪問狀態使用正確的同步。

正如封裝對象的狀態有助於維持不變性條件同樣,封裝對象的同步機制童謠有助於確保實施同步策略

  容器的 hashCodeequals 等方法也會間接地執行迭代操做,當容器做爲另外一個容器的元素或鍵值時,就會出現這種狀況。一樣 containsAll、removeAll、retainAll 等方法都會對容器進行迭代。

  

5.2  併發容器

  同步容器將全部對容器狀態的訪問都串行化,以實現它們的線程安全性。但這種方法的代價是嚴重下降併發性。在 Java5.0 中增長了 ConcurrentHashMap,用來替代同步且基於散列的 Map,以及 CopyOnWriteArrayList,用於在遍歷操做爲主要操做的狀況下代替同步的 List 。在新的 ConcurrentMap 接口中增長了對一些常見覆合操做的支持,例如「若沒有則添加」、替換有條件刪除等。

經過併發容器來代替同步容器,能夠極大地提升伸縮性並下降風險。

  在 Java5.0 增長了兩種新的容器類型: QueueBlockingQueueQueue 用來臨時保存一組待處理的元素。它提供了幾種實現,包括:ConcurrentLinkedQueue,這是一個傳統的先進先出隊列,以及 PriorityQueue,這是一個(非併發的)優先隊列。Queue 上的操做不會阻塞,若是隊列爲空,那麼獲取元素的操做將返回空值。雖然能夠用 List 來模擬 Queue 的行爲(事實上,正是經過 LinkedList 來實現 Queue的),但還須要一個 Queue 類,由於它能去掉 List 的隨機訪問需求,從而實現更高效的併發。

  BlockingQueue 擴展了 Queue,增長了可阻塞的插入和獲取等操做。若是隊列爲空,那麼獲取元素的操做將一直阻塞,直到隊列中出現一個可用的元素。若是隊列已滿,那麼插入元素的操做將一直阻塞,知道隊列中出現可用的空間。在「生產者--消費者」這種設計模式中,阻塞隊列是很是有用的。

 

5.2.1  ConcurrentHashMap

  同步容器類在執行每一個操做期間都持有一個鎖。例如 HashMap.getList.contains,可能包含大量的工做:當遍歷散列桶或鏈表來查找某個特定的對象時,必須在許多元素上調用 equals(而 equals 自己還包含了必定的計算量)。

  與 HashMap 同樣,ConcurrentHashMap 也是一個基於散列的 Map,但它使用了一種徹底不一樣的加鎖策略來提供更高的併發性和伸縮性。ConcurrentHashMap 並非將每一個方法都在同一個鎖上同步並使得每次只能有一個線程訪問容器,而是使用一種粒度更細的加鎖機制來實現更大程度的共享,這種機制稱爲分段鎖

  ConcurrentHashMap 返回的迭代器具備弱一致性(Weakly Consistent),而並不是「及時失敗」。弱一致性的迭代器能夠容忍併發的修改,當建立迭代器時會遍歷已有的元素,並能夠(可是不保證)在迭代器被構造後將修改操做反映給容器。

 

5.2.2  額外的原子 Map 操做

  因爲 ConcurrentHashMap 不能被加鎖來執行獨佔訪問,所以咱們沒法使用客戶端加鎖來建立新的原子操做。可是,一些常見的複合操做,例如「若沒有則添加」、「若相等則移除(Remove-If-Equal)」、「若相等則替換(Replace-If-Equla)」等,都已經實現爲原子操做而且在 ConcurrentMap 的接口中聲明。

 

5.2.3  CopyOnWriteArrayList

  CopyOnWriteArrayList 用於替代同步 List,在某些狀況下它提供了更好的併發性能,而且在迭代期間不須要對容器進行加鎖或複製。

  「寫入時複製(Copy-On-Write)」 容器的線程安全性在於,只要正確地發佈一個事實不可變的對象,那麼在訪問該對象時就再也不須要進一步的同步。在每次修改時,都會建立並從新發佈一個新的容器副本,從而實現可變性。「寫入時複製」容器的迭代器保留一個指向底層基礎數組的引用,這個數組當前位於迭代器的起始位置,因爲它不會被修改,所以在對其進行同步時只需確保數組內容的可見性。

  顯然,每當修改容器時都會複製底層數組,這須要必定的開銷,特別是當容器的規模較大時。

  

5.3  阻塞隊列 和 生產者--消費者

  阻塞隊列提供了可阻塞的 put 和 take 方法,以及支持定時的 offer 和 poll 方法。若是隊列已經滿了,那麼 put 方法將阻塞直到有空間可用;若是隊列爲空,那麼 take 方法將會阻塞直到有元素可用。隊列能夠是有界的也能夠是無界的,無界隊列永遠都不會充滿,所以無界隊列上的 put 方法永遠不會阻塞。

  阻塞隊列支持 生產者--消費者 這種設計模式。BlockingQueue 簡化了生產者--消費者的實現過程。阻塞隊列提供一個 offer 方法,若是數據項不能被添加到隊列中,那麼將返回一個失敗狀態。這樣你就可以建立更多靈活的策略來處理符合過載的狀況,例如減輕負載,將多餘的工做項序列化並寫入磁盤,減小生產者線程的數量,或者經過某種方式來一致生產者線程。

在構建高可靠的應用程序時,有界隊列是一種強大的資源管理工具:它們能抑制並防止產生過多的工做項,使應用程序在負荷過載的狀況下變得更加健壯。

 

5.3.1  示例:桌面搜索

  有一種類型的程序適合被分解爲生產者和消費者,例如代理程序,它將掃描本地驅動器上的文件並創建索引以便隨後進行搜索,相似於某些桌面搜索程序。

  程序清單 5-8:桌面搜索應用程序中的生產者任務和消費者任務

public class FileCrawler implements Runnable {
    private final BlockingQueue<File> filequeue;
    private final FileFilter fileFilter;
    private final File root;

    public FileCrawler(BlockingQueue<File> filequeue, FileFilter fileFilter, File root) {
        this.filequeue = filequeue;
        this.fileFilter = fileFilter;
        this.root = root;
    }

    @Override
    public void run() {
            try {
                crawl(root);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    }

    private void crawl(File root) throws InterruptedException {
        File[] entries = root.listFiles(fileFilter);
        if (entries != null) {
            for (File entry : entries) {
                if (entry.isDirectory()) {
                    crawl(entry);
                } else if (!alreadyIndexed(entry)) {
                    filequeue.put(entry);  //
                }
            }
        }
    }
}

public class Indexer implements Runnable {
    private final BlockingQueue<File> queue;
    public Indexer(BlockingQueue<File> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            while (true) {
                indexFile(queue.take());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  程序清單 5-9:啓動桌面搜索

public static void startIndexing(File[] roots) {
        BlockingQueue<File> queue = new LinkedBlockingDeque<>(BOUND);
        FileFilter filter = new FileFilter() {
            @Override
            public boolean accept(File pathname) {
                return true;
            }
        };
        for (File root : roots) {
            new Thread(new FileCrawler(queue, filter, root)).start();
        }
        for (int i = 0; i < N_CONSUMERS; i++) {
            new Thread(new Indexer(queue)).start();
        }
    }

 

5.3.3  雙端隊列與工做密取

  Java6.0 增長了兩種容器類型,Deque(發音爲‘deck’)和 BlockingDuque,它們分別對 QueueBlockingQueue 進行了擴展。Deque 是一個雙端隊列,實現了在隊列頭和隊列尾的高效插入和移除

  正如阻塞隊列適用於生產者--消費者模式,雙端隊列一樣適用於另外一種相關模式,即工做密取(Work Stealing)。在生產者--消費者設計中,全部消費者有一個共享的工做隊列,而在工做密取設計中,每一個消費者都有各自的雙端隊列。若是一個消費者完成了本身雙端隊列中的所有工做,那麼它能夠從其餘消費者雙端隊列末尾祕密地獲取工做。

 

5.4  阻塞方法與中斷方法

  線程可能阻塞或暫停執行,緣由有多種:等待 I/O 操做結束,等待得到一個鎖,等待從 Thread.sleep 方法中醒來,或是等待另外一個線程的計算結果。當線程阻塞時,它一般被掛起,並處於阻塞狀態。 BlockingQueueputtake 方法則會拋出受檢查異常 InterruptedException。

  Thread 提供了 interrupt 方法,用於中斷線程或者查詢線程是否已經中斷。當在代碼中調用了一個將拋出 InterruptedException 異常的方法時,你本身的方法也就變成了一個阻塞方法,而且須要處理對中斷的響應。對此,有兩種基本選擇:

  • 傳遞 InterruptedException:避開這個異常一般是最明確的策略----只須要把 InterruptedException 傳遞給方法的調用者。傳遞 InterruptedException 的方法包括,根本不捕獲該異常,或者捕獲該異常,而後在執行某種簡單的清理工做後再次拋出這個異常。
  • 恢復中斷。有時候不能拋出 InterruptedException。例如當代碼是 Runable 的一部分時。在這些狀況下,必須捕獲 InterruptedException,並經過調用當前線程上的 interupt 方法恢復中斷狀態,這樣在調用棧中更高層的代碼將看到引起一箇中斷。如代碼清代 5-10:恢復中斷狀態以免屏蔽中斷
        public void run() {
                try {
                    doSomething();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt(); // 恢復中斷狀態
                }
        }

     

5.5  同步工具類

  在容器類中,阻塞隊列是一種獨特的類:它們不只能做爲保存對象的容器,還能協調生產者和消費者等線程之間的控制流,由於 take 和 put 等方法將阻塞,知道隊列達到指望的狀態(隊列既非空,也非滿)。

  阻塞度咧能夠做爲同步工具類,其餘類型的同步工具類還包括 信號量 Semaphore、柵欄 Barrier、閉鎖 Latch

 

5.5.1  閉鎖

  閉鎖是一種同步工具類,能夠延遲線程的進度知道其到達終止狀態。閉鎖的做用至關於一扇門:在閉鎖到達結束狀態以前,這扇門一直是關閉的,而且沒有任何線程能經過,當到達結束狀態時,這扇門會打開並容許全部的線程經過。當閉鎖到達結束狀態後,將不會再改變狀態,所以這扇門將永遠保持打開狀態

  閉鎖包括一個計數器,該計數器初始化爲一個正數,表示須要等待的事件數量。 countDown 方法遞減計數器,表示有一個事件已經發生,而 await 方法等待計數器達到零,這表示全部須要等待的事件都已經發生。

  在下程序清單 5-11 中 TestHarness中給出了閉鎖的兩種常見用法。TestHarness 建立必定數量的線程,利用它們併發地執行指定任務。它使用兩個閉鎖,分別表示「起始門(Starting Gate)」和「結束門(Ending Gate)」。起始門計數器的初始值爲1,而結束們計數器的初始值爲工做線程數量。每一個工做線程首先要作的就是在啓動門上等待,從而確保全部線程都就緒後纔開始執行。而每一個線程要作的最後一件事情時將調用結束門的 countDown 方法,這樣使主線程高效地等待知道全部工做線程都執行完成,所以統計所消耗的時間。

  程序清單5-11 :在計時測試中使用 CountDownLatch 來啓動和中止線程

public class TestHarness {
    public long teimeTasks(int nThreads, final Runnable task) throws InterruptedException{
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);
        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread() {
                public void run() {
                    try {
                        startGate.await();
                        try {
                            task.run();
                        } finally {
                            endGate.countDown();
                        }
                    } catch (InterruptedException e) {
                        // do not 
                    }
                }
            };
            t.start();
        }
        long startTime = System.nanoTime();
        startGate.countDown();  //開啓
        endGate.await();        //等待全部線程結束
        long endTime = System.nanoTime();
        return endTime - startTime;
    }
}

 

5.5.2   FutureTask

  FutureTask 也能夠用來作閉鎖。FutureTask表示的計算時經過 Callable 來實現的,至關於一種可生成結果的 Runnable,而且能夠處於如下三種狀態:等待運行,正在運行和運行完成。「執行完成」表示計算的全部可能結束方式,包括正常結束、因爲取消而結束和因爲異常而結束等。

  FutureTask.get 的行爲取決於任務的狀態。若是任務已經完成,那麼 get 會當即返回結果,不然 get 將阻塞直到任務進入完成狀態,而後返回結果或拋出異常。

  FutureTask 在 Executor 框架中表示異步任務, 能夠用來作一些時間較長的計算,能夠在使用計算結果以前提早啓動。 程序清單 5-12:使用FutureTask 來提早加載稍後須要的數據

public class PreLoader {
    private final FutureTask<ProductInfo> futureTask = new FutureTask<ProductInfo>(new Callable<ProductInfo>() {
        @Override
        public ProductInfo call() throws Exception {
            return loadProductInfo();
        }
    });
    private final Thread thread = new Thread(futureTask);
    public void start() {
        thread.start();
    }
    public ProductInfo get() throws InterruptedException{
        try {
            return futureTask.get();   // 記得調用 start() 否則會一直在這裏等候
        } catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof DataLoadException) throw (DataLoadException)cause;
            else ;
        }
        return null;
    }
}

   當程序須要 ProductInfo 時,能夠調用 get 方法,若是數據已經加載,那麼將返回這些數據,不然將等待加載完成後再返回。

 

5.5.3  信號量

  計數信號量(Counting Semaphore)用來控制同時訪問某個特定資源的操做數量,或者同時執行某個指定操做的數量。計數信號量能夠用來實現某種資源池,或者對容器施加邊界。

  Semaphore 中管理者一組虛擬的許可(Permit),許可的初始數量能夠經過構造函數來指定。在執行操做時能夠首先得到許可(只要還有剩餘的許可),並在使用之後釋放許可。若是沒有許可,那麼 acquire 將阻塞直到有許可(或者直到被中斷或者操做超時)。release 方法將返回一個許可給信號量。計算信號量的一種簡化形式是二值信號量,即初始值爲 1 的 Semaphore 。二值信號量能夠用戶互斥體(mutex),並具有不可重入的加鎖語義:誰擁有這個惟一的許可,誰就擁有了互斥鎖。

  若是將 Semaphore 的計數值初始化爲池的大小,並在從池中獲取一個資源以前首先調用 acquire 方法獲取一個許可,在將資源返回給池以後調用 release 釋放許可,那麼 acquire 將一直阻塞直到資源池不爲空。

  程序清單 5-14 使用 Semaphore 爲容器設置邊界 

public class BoundedHashSet<A> {
    private final Set<A> set;
    private final Semaphore sem;

    public BoundedHashSet(int bound) {
        this.set = Collections.synchronizedSet(new HashSet<>());
        this.sem = new Semaphore(bound);
    }
    public boolean add(A a) throws InterruptedException {
        sem.acquire();
        boolean wasAdded = false;
        try {
            wasAdded = set.add(a);
            return wasAdded;
        } finally {
            if (!wasAdded) sem.release();
        }
    }
    public boolean remove(Object o) {
        boolean wasRemoved = set.remove(o);
        if (wasRemoved) sem.release();
        return wasRemoved;
    }
}

 

5.5.4  柵欄

  柵欄 Barrier 相似於閉鎖,它能阻塞一組線程直到某個事件發生。柵欄與閉鎖的關鍵區別在於,全部線程必須同時到達柵欄位置,才能繼續執行。閉鎖用於等待事件,而柵欄用於等待其餘線程

  CyclicBarrier 可使必定數量的參與方反覆地在柵欄位置聚集,它在並行迭代算法中很是有用:這種算法一般將一個問題拆分紅一系列相互獨立的子問題。

  當線程到達柵欄位置時將調用 await 方法,這個方法將阻塞直到全部線程都到達柵欄位置。若是全部線程都到達了柵欄位置,那麼柵欄將打開,此時全部線程都被釋放,而柵欄將被重置以便下次使用。

  在程序清單 5-15 的 CellularAutomata 中給出瞭如何經過柵欄來計算細胞的自動化模擬,例如 Conway 的生命遊戲。在把模擬過程並行化,爲每一個元素(在這個實例中至關於一個細胞)分配一個獨立的線程是不現實的,由於這將會產生過多的線程,而在協調這些線程上致使德開銷將下降計算性能。合理的作法是:將問題分解成必定數量的子問題,爲每一個子問題分配一個線程來進行求解,以後再將全部的結果合併起來。

  程序清單 5-15:經過 CyclicBarrier 協調細胞自動衍生系統中的計算

public class CellularAutomata {
    private final Board mainBoard;
    private final CyclicBarrier barrier;
    private final Worker[] workers;

    public CellularAutomata(Board board) {
        this.mainBoard = board;
        int count = Runtime.getRuntime().availableProcessors();
        this.barrier = new CyclicBarrier(count, new Runnable() {  //
            @Override
            public void run() {
                mainBoard.commitNewValues();
            }
        });
        this.workers = new Worker[count];
        for (int i = 0; i < count; i++) {
            workers[i] = new Worker(mainBoard.getSubBoard(count, i));
        }
    }

    public void start() {
        for (int i = 0; i < workers.length; i++) {
            new Thread(workers[i]).start();
        }
        mainBoard.waitForConvergence();
    }

    private class Worker implements Runnable {
        private final Board board;
        public Worker(Board board) {
            this.board = board;
        }
        public void run() {
            while (!board.hasConverged()) {
                for (int x = 0; x < board.getMaxX(); x++) {
                    for (int y = 0; y < board.getMaxY; y++) {
                        board.setNewValue(x, y, computeValue(x, y));
                    }
                }
                try {
                    barrier.await();  //
                } catch (InterruptedException e) {
                    return;
                } catch (BrokenBarrierException e) {
                    return;
                }
            }
        }
    }
}

 

5.6  構建高效且可伸縮的結果緩存

  幾乎全部的服務器應用程序都會使用某種形式的緩存。重用以前的計算結果能下降延遲,提升吞吐量,但卻須要消耗更多的內存。本節咱們將開發一個高效且可伸縮的緩存,用於改進一個高計算開銷的函數。

  程序清單 5-16 :使用 HashMap 和同步機制來初始化緩存

public interface Computable<A, V> {          【皺眉】
    V compute(A arg) throws InterruptedException;
}

public class ExpensiveFunction implements Computable<String, BigInteger>{
    @Override
    public BigInteger compute(String arg) throws InterruptedException {
        //通過長時間計算後
        return new BigInteger(arg);
    }
}

public class Memoizer1<A, V> implements Computable<A, V> {
    private final Map<A, V> cache = new HashMap<>(); //使用HashMap 存儲
    private final Computable<A, V> c;
    public Memoizer1(Computable<A, V> c) {
        this.c = c;
    }
    @Override
    public synchronized V compute(A arg) throws InterruptedException {
        V result = cache.get(arg);  
        if (result == null) {   // 單例容器,最好有長時間自動銷燬功能
            result = c.compute(arg);
            cache.put(arg, result);
        }
        return result;
    }
}

  在程序清單 5-16 中 Memoizer1 給出一種嘗試:使用 HashMap 來保存計算結果,compute 檢查是否有緩存,有則返回,沒有則計算後保存緩存再返回。

  可是 HashMap 不是線程安全的,所以要確保兩個線程同時訪問 HashMap,Memoizer1 給出一種保守的方法,即對整個 compute 方法進行同步,可見這有明顯的伸縮性問題。

  下面的 程序清單 5-17 中 Memoizer2 用 ConcurrentHashMap 來代替 HashMap 來改進 Memoizer1 中的併發行爲。Memoizer2 比 Memoizer1 有着更好的併發行爲:多線程能夠併發地使用它。但它做爲緩存仍然存在一些不足----當兩個線程同時調用 compute 時存在一個漏洞,可能會致使計算獲得相同值。

  程序清單 5-17 :用 ConcurrentHashMap 替換 HashMap

public class Memoizer2<A, V> implements Computable<A, V> {
    private final Map<A, V> cache = new ConcurrentHashMap<>(); //使用HashMap 存儲
    private final Computable<A, V> c;
    public Memoizer2(Computable<A, V> c) {
        this.c = c;
    }
    @Override
    public V compute(A arg) throws InterruptedException {  //去掉同步
        V result = cache.get(arg);
        if (result == null) {   
            result = c.compute(arg); //假設這出現併發狀況,極可能會進行兩次計算,不符合要求
            cache.put(arg, result);
        }
        return result;
    }
}

  Memoizer2 的問題在於,若是某個線程啓動了一個開銷很大的計算,而其餘線程並不知道這個計算正在進行,那麼極可能會重複計算。咱們但願實現 「線程A在計算 f(21)」,另外一個線程查找 f(21) 時,它可以知道  f(21) 正在進行,而且等線程 A 計算結束後,再去查詢緩存  f(21) 的結果。

  咱們已經知道有個類能基本實現這個功能:FutureTask。FutureTask 表示一個計算的過程,這個過程可能已經計算完成,也可能正在進行。若是有結果可用,那麼 FutureTask.get 將當即返回結果,不然它會一直阻塞,直到結果計算出來並返回。

  程序清單 5-18 中 Memoizer3 將用於緩存值的 Map 從新定義爲 ConcurrentHashMap<A, Future<V>>,來替換原來的 ConcurrentHashMap<A, V> 。

  程序清單 5-18:基於 FutureTask 的 Memoizing 封裝器

public class Memoizer3<A, V> implements Computable<A, V> {
    private final Map<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();
    private final Computable<A, V> c;
    public Memoizer3(Computable<A, V> c) {
        this.c = c;
    }
    @Override
    public V compute(A arg) throws InterruptedException {
        Future<V> result = cache.get(arg);
        if (result == null) {
            FutureTask<V> ft = new FutureTask<V>(new Callable<V>() {
                @Override
                public V call() throws Exception {
                    return c.compute(arg);
                }
            });
            result = ft;
            cache.put(arg, ft); //仍是可能出現  併發問題
            ft.run();
        }
        try {
            return result.get();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return null;
    }
}

  Memoizer3 的實現幾乎是完美的:它表現出了很是好的併發性(基本上是源於 ConcurrentHashMap 高效的併發性)。但它還有一個小缺陷,即仍然存在兩個線程計算出相同值的漏洞。因爲 compute 方法中的 if 代碼塊仍然是非原子的「先檢查再執行」操做。所以兩個線程仍然可能在同一時間調用 compute 來計算相同的值,即兩者沒有在緩存中找到指望的值,所以都開始計算。

  Memoizer3 中存在這個問題的緣由是,複合操做(「若沒有則添加」)是在底層的Map 對象上執行的,而這個對象沒法經過加鎖來確保原子性。程序清單 5-19 中的 Memoizer 使用了 ConcurrentMap 中的原子方法 putIfAbsent,避免了Memoizer3  中的漏洞。

  程序清單 5-19 Memoizer的最終實現

public class Memoizer<A, V> implements Computable<A, V> {
    private final Map<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();
    private final Computable<A, V> c;
    public Memoizer(Computable<A, V> c) {
        this.c = c;
    }
    @Override
    public V compute(A arg) throws InterruptedException {
        Future<V> result = cache.get(arg);
        if (result == null) {
            FutureTask<V> ft = new FutureTask<V>(new Callable<V>() {
                @Override
                public V call() throws Exception {
                    return c.compute(arg);
                }
            });
            result = cache.putIfAbsent(arg, ft);
            if (result == null) {
                result = ft;
                ft.run();
            }
        }
        try {
            return result.get();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return null;
    }
}

  在完成併發緩存的實現後,就能夠爲在實際中使用了。程序清單 5-20 :在因式分解 servlet 中使用 Memoizer 來緩存結果

public class Factorizer implements Servlet {
    private final Computable<BigInteger, BigInteger[]> c = new Computable<BigInteger, BigInteger[]>() {
        @Override
        public BigInteger[] compute(BigInteger arg) throws InterruptedException {
            return factor(arg);
        }
    };
    private final Computable<BigInteger, BigInteger[]> cache = new Memoizer<>(c);
    public void service(ServletRequest req, ServletResponse resp) {
        try {
            BigInteger i = extractFromRequest(req);
            encodeInroResponse(resp, cache.compute(i)); //使用本身寫的緩存工具
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
相關文章
相關標籤/搜索