七週七併發之線程與鎖

1.概述

1.1併發仍是並行(Concurrent or Parallel)

  • A concurrent program has multiple logical threads of control. These threads may or may not run in parallel.java

  • A parallel program may or may not have more than one logical thread of control.node

  • 併發是問題域中的概念:程序須要設計成可以處理多個同時(幾乎同時)發生的事件;而並行則是方法域中的概念:經過將問題中的多個部分併發執行,來加速解決問題.程序員

  • 併發是同一時間**應對(dealing with)多件事件的處理能力;並行是同一時間動手作(doing)**多件事情的能力算法

  • 能夠同時處理多個問題,可是每次只作一件事,是併發.編程

  • 我妻子是一位教師。與衆多教師同樣,她極其善於處理多個任務。她雖然每次只能作一件事,但能夠併發地處理多個任務。好比,在聽一位學生朗讀的時候,她能夠暫停學生的朗讀,以維持課堂秩序,或者回答學生的問題。這是併發,但併不併行(由於僅有她一我的,某一時刻只能進行一件事)。 但若是還有一位助教,則她們中一位能夠聆聽朗讀,而同時另外一位能夠回答問題。這種方式既是併發,也是並行。 假設班級設計了本身的賀卡並要批量製做。一種方法是讓每位學生製做五枚賀卡。這種方法是並行,而(從總體看)不是併發,由於這個過程總體來講只有一個任務。緩存

  • 併發和並行常常被混淆的一個緣由是,傳統的「線程與鎖」模型並無顯式支持並行。若是要用線程與鎖模型爲多核進行開發,惟一的選擇就是寫一個併發的程序,讓其並行地運行在多核上。安全

  • 併發程序一般是不肯定的, 並行程序多是肯定的. 使用一門直接支持並行的編程語言能夠寫出並行程序,而不會引入不入不 肯定性.網絡

1.2 並行架構

位級(bit-level)並行

  • 32位計算機比8位計算機運行速度更快,由於並行,對於兩個32位數的加法,8位計算 機必須進行屢次8位計算,而32位計算機能夠一步完成,即並行地處理32位數的4字節。

指令級(instruction-level)並行

  • 現代CPU的並行度很高,其中使用的技術包括流水線、亂序執行和猜想執行等。入多核時代,咱們必須面對的狀況是:不管是表面上仍是實質上,指令都再也不串行 執行了。這個就是內存可見性的問題啊,因爲cpu對指令的重排序!!重排序問題的引入

數據級(data)並行 數據級並行

  • 圖像處理就是一種適合進行數據級並行的場景。好比,爲了增長圖片亮度就須要增長每個像 素的亮度。現代GPU(圖形處理器)也因圖像處理的特色而演化成了極其強大的數據並行處理器。

任務級(task-level)並行

  • 終於來到了你們所默認的並行形式——多處理器。從程序員的角度來看,多處理器架構最明 顯的分類特徵是其內存模型(共享內存模型或分佈式內存模型)。多線程

  • 共享內存模型(經過內存通訊)架構

  • 分佈式內存模型( 經過網絡通訊 )

1.3 併發:不僅是多核

  • 併發的世界,併發的軟件
  • 分佈式的世界,分佈式的軟件
  • 不可預測的世界,容錯性強的軟件
  • 複雜的世界,簡單的軟件
  • 在選對編程語言和工具的狀況下,比起串行的等價解決方案,一個併發的解決方案會更簡潔清晰。
  • 若是解決方案有着與問題相似的併發結構,就會簡單許多:咱們不須要建立一個複雜的線程來處理問題中的多個任務,只須要用多個簡單的線程分別處理不一樣的任務便可。

1.4 七個模型

線程與鎖

  • 線程與鎖模型有不少衆所周知的不足,但還是其餘模型的技術基礎,也是不少併發軟件開發的首選。

函數式編程

  • 函數式編程日漸重要的緣由之一,是其對併發編程和並行編程提供了良好的支持。函數式編程消除了可變狀態,因此從根本上是線程安全的,並且易於並行執行。

Clojure之道——分離標識與狀態

  • 編程語言Clojure是一種指令式編程和函數式編程的混搭方案,在兩種編程方式上取得了微妙的平衡來發揮二者的優點。

actor

  • actor模型是一種適用性很廣的併發編程模型,適用於共享內存模型和分佈式內存模型,也適合解決地理分佈型問題,能提供強大的容錯性。

通訊順序進程(Communicating Sequential Processes,CSP)

  • 表面上看,CSP模型與actor模型很類似,二者都基於消息傳遞。不過CSP模型側重於傳遞信息的通道,而actor模型側重於通道兩端的實體,使用CSP模型的代碼會帶有明顯不一樣的風格。

數據級並行

  • 每一個筆記本電腦裏都藏着一臺超級計算機——GPU。GPU利用了數據級並行,不只能夠快速進行圖像處理,也能夠用於更廣闊的領域。若是要進行有限元分析、流體力學計算或其餘的大量數字計算,GPU的性能將是不二選擇。

Lambda架構

  • 大數據時代的到來離不開並行——如今咱們只須要增長計算資源,就能具備處理TB級數據的能力。Lambda架構綜合了MapReduce和流式處理的特色,是一種能夠處理多種大數據問題的架構。

2.線程與鎖

2.1 簡單粗暴

  • 線程與鎖實際上是是對底層硬件運行過程的形式化.這是他的最大優勢也是最大缺點
  • 如今的優秀代碼不多直接使用底層服務:不該在產品代碼上直接使用Thread類等底層服務

2.2 第一天:互斥和內存模型

競態條件

內存可見性

class Counter {
    private int count = 0;
    public synchronized void increment() { ++count; } ➤
    public int getCount() { return count; }
}
  • 這段代碼沒有競態條件的bug 可是又內存可見性的bug 由於getCount()沒有加鎖,調用getCount()可能得到一個失效的值

死鎖

  • 哲學家進餐問題
class Philosopher extends Thread {
    private Chopstick left, right;
    private Random random;

    public Philosopher(Chopstick left, Chopstick right) {
        this.left = left; this.right = right;
                random = new Random();
    }

    public void run() {
        try {
            while(true) {
                    Thread.sleep(random.nextInt(1000)); // Think for a while
                synchronized(left) { // Grab left chopstick //
                    synchronized(right) { // Grab right chopstick // 15
                        Thread.sleep(random.nextInt(1000)); // Eat for a while
                    }
                }
            }
        } catch(InterruptedException e) {}
    }
}
  • 建立五個哲學家實例,這個程序能夠愉快的運行好久,但到某個時刻一切會停下來:若是全部哲學家同時進餐,就會死鎖(相鄰的幾個同時準備進餐還不至於會死鎖)
  • 一個線程想使用多把鎖的時候就要考慮死鎖的可能,有一個簡單的規則還有避免死鎖:老是按照一個全局的固定的順序獲取多把鎖,其中一種實現以下:
class Philosopher extends Thread {
    private Chopstick first, second;
    private Random random;
    private int thinkCount;

    public Philosopher(Chopstick left, Chopstick right) {
        if(left.getId() < right.getId()) {
            first = left; second = right;
        } else {
            first = right; second = left;
        }
        random = new Random();
    }

    public void run() {
        try {
            while(true) {
                ++thinkCount;
                if (thinkCount % 10 == 0)
                    System.out.println("Philosopher " + this + " has thought " + thinkCount + " times");
                Thread.sleep(random.nextInt(1000));     // Think for a while
                synchronized(first) {                   // Grab first chopstick
                    synchronized(second) {                // Grab second chopstick
                        Thread.sleep(random.nextInt(1000)); // Eat for a while
                    }
                }
            }
        } catch(InterruptedException e) {}
    }
}
  • 程序解釋:當全部人同時決定進餐的時候,ABCD左手分別拿起1234號筷子(對於他們小的編號的筷子仍是在左手),這和上面的程序沒啥不一樣,可是差異就在這個E,他左邊的筷子是大編號,因此他左手拿的是1,然而1被A拿了,因此他就一隻筷子都拿不到,因此D能夠正常進餐,就不會死鎖

  • 侷限:獲取鎖的代碼寫的比較集中的話,有利於維護這個全局順序,可是對於規模比較大的程序,使用鎖的地方比較零散,各處都遵照這個順序就顯得不太實際.

  • 技巧:使用對象的散列值做爲鎖的全局順序

  • 優勢:適用於全部java對象,不用爲鎖對象專門定義並維護一個順序,

  • 缺點:可是對象的散列值不能保證惟一性(雖然概率很小), 不是無可奈何不要使用

if(System.identityHashCode(left) < System.identityHashCode(right)) {
            first = left; second = right;
} else {
            first = right; second = left;
}

來自外星方法的危害

private synchronized void updateProgress(int n) {
    for (ProgressListener listener: listeners) // listeners是累的一個field
      listener.onProgress(n);
  }
  • 上面的方法乍一看好像沒啥問題,可是這個方法調用了onProgress()方法,咱們對這個方法一無所知, 要是他裏面還有一把鎖,就可能會死鎖
  • 解決方案:在遍歷前對listeners進行保護性複製(defensive copy),再針對這份副本進行遍歷
private void updateProgress(int n) {
    ArrayList<ProgressListener> listenersCopy;
    synchronized(this) {
      listenersCopy = (ArrayList<ProgressListener>)listeners.clone();
    }
    for (ProgressListener listener: listenersCopy)
      listener.onProgress(n);
  }
  • 這個方案一石多鳥,不只調用外星方法的時候不用加鎖,並且還大大減小了代碼持有鎖的時間(前面是對方法加synchronized,這裏是對語句塊)

避免危害的準則

  • 1.對共享變量的全部訪問都須要同步化(讀髒數據,競態條件)
  • 2.讀線程和寫線程都須要同步化(內存可見性)
  • 3.按照約定的全局順序獲取多把鎖(死鎖)
  • 4.當持有鎖的時候避免調用外星方法(你對外星方法一無所知,要是他裏面有鎖,就會死鎖)
  • 5.持有鎖的時間儘量短

2.3次日:超越內置鎖

  • ReentrantLock提供了顯式的lock和unlock
Lock lock = new ReentrantLock();
lock.lock();
try{
  //使用共享資源

} finally { //使用finally確保鎖釋放
  lock.unlock();
}

可中斷的鎖

  • 使用內置鎖,因爲阻塞的線程沒法被中斷,因此程序不可能從死鎖中恢復,能夠用ReentrantLock代替內置鎖,使用它的lockInterruptibly 在下面的程序中使用Thread.interrupt()可讓線程終止(這裏說的都是死鎖狀況下)
final ReentrantLock l1 = new ReentrantLock();
    final ReentrantLock l2 = new ReentrantLock();

    Thread t1 = new Thread() {
      public void run() {
        try {
          l1.lockInterruptibly();
          Thread.sleep(1000);
          l2.lockInterruptibly();
        } catch (InterruptedException e) { System.out.println("t1 interrupted"); }
      }
    };

超時

  • ReentrantLock突破了內置鎖的另外一個限制:能夠爲獲取鎖的操做設置超時時間,能夠用這種方式來解決哲學家進餐問題
class Philosopher extends Thread {
  private ReentrantLock leftChopstick, rightChopstick;
  private Random random;
  private int thinkCount;

  public Philosopher(ReentrantLock leftChopstick, ReentrantLock rightChopstick) {
    this.leftChopstick = leftChopstick; this.rightChopstick = rightChopstick;
    random = new Random();
  }

  public void run() {
    try {
      while(true) {
        ++thinkCount;
        if (thinkCount % 10 == 0)
          System.out.println("Philosopher " + this + " has thought " + thinkCount + " times");
        Thread.sleep(random.nextInt(1000)); // Think for a while
        leftChopstick.lock();
        try {
          if (rightChopstick.tryLock(1000, TimeUnit.MILLISECONDS)) {
            // Got the right chopstick
            try {
              Thread.sleep(random.nextInt(1000)); // Eat for a while
            } finally { rightChopstick.unlock(); }
          } else {
            // Didn't get the right chopstick - give up and go back to thinking
            System.out.println("Philosopher " + this + " timed out");
          }
        } finally { leftChopstick.unlock(); }
      }
    } catch(InterruptedException e) {}
  }
}
  • 雖然上述代碼不會死鎖,但也不是一個足夠好的方案,後面有更好的方案
  • 1.這個方案不能避免死鎖,只能避免無盡的死鎖 只是提供了從死鎖中恢復的手段
  • 2.會受到活鎖現象,若是全部死鎖線程同時超時,它們極有可能再次陷入死鎖,雖然死鎖沒有永遠持續下去,但對資源的爭奪情況沒有獲得任何改善(能夠用一些方法減小活鎖的概率,好比爲每一個線程設置不一樣的超時時間)

交替鎖(hand-over-hand locking)

  • 交替鎖能夠只鎖住鏈表的一部分,容許不涉及被鎖部分的其餘線程自由訪問鏈表.插入新的鏈表節點時,須要將待插入位置兩邊的節點加鎖.首先鎖住鏈表的前兩個節點,若是這兩個節點之間不是待插入的位置,那麼就解鎖第一個,並鎖住第三個,以此類推,知道找到待插入位置並插入新的節點,最後解鎖兩邊的節點

  • 這種交替型的加鎖和解鎖順序沒法用內置鎖實現,使用ReentrantLock能夠

class ConcurrentSortedList {
  private class Node {
    int value;
    Node prev;
    Node next;
    ReentrantLock lock = new ReentrantLock();

    Node() {}

    Node(int value, Node prev, Node next) {
      this.value = value; this.prev = prev; this.next = next;
    }
  }

  private final Node head;
  private final Node tail;

  public ConcurrentSortedList() {
    head = new Node(); tail = new Node();
    head.next = tail; tail.prev = head;
  }

  //insert方法是有序的 遍歷列表直到找到第一個值小於等於新插入的值得節點,在這個位置插入
  public void insert(int value) {
    Node current = head;
    current.lock.lock();
    Node next = current.next;
    try {
      while (true) {
        next.lock.lock();
        try {
          if (next == tail || next.value < value) {
            Node node = new Node(value, current, next);
            next.prev = node;
            current.next = node;
              //!!!這裏return要在兩個finally都執行完後纔會執行啊!!!但只是finally裏的.不過要是return換成exit(0)就直接退出了

            return; 
          }
        } finally { current.lock.unlock(); }
        current = next;
        next = current.next;
      }
    } finally { next.lock.unlock(); }
  }

  public int size() {
    Node current = tail;
    int count = 0;

    while (current.prev != head) {
      ReentrantLock lock = current.lock;
      lock.lock();
      try {
        ++count;
        current = current.prev;
      } finally { lock.unlock(); }
    }

    return count;
  }

  public boolean isSorted() {
    Node current = head;
    while (current.next.next != tail) {
      current = current.next;
      if (current.value < current.next.value)
        return false;
    }
    return true;
  }
}

class LinkedList {
  public static void main(String[] args) throws InterruptedException {
    final ConcurrentSortedList list = new ConcurrentSortedList();
    final Random random = new Random();

    class TestThread extends Thread {
      public void run() {
        for (int i = 0; i < 10000; ++i)
          list.insert(random.nextInt());
      }
    }

    class CountingThread extends Thread {
      public void run() {
        while (!interrupted()) {
          System.out.print("\r" + list.size());
          System.out.flush();
        }
      }
    }

    Thread t1 = new TestThread();
    Thread t2 = new TestThread();
    Thread t3 = new CountingThread();
    //注意一下這裏的用法 這裏先join再interrupted的用法
    t1.start(); t2.start(); t3.start();
    t1.join(); t2.join();
    t3.interrupt();
 
    System.out.println("\r" + list.size());
 
    if (list.size() != 20000)
      System.out.println("*** Wrong size!");
 
    if (!list.isSorted())
      System.out.println("*** Not sorted!");
  }
}
  • 26行的 next.lock.lock();鎖住了頭,36行的 next.lock.lock();鎖住了下一個節點. if ( next == tail || next.value < value )判斷兩個節點之間是不是待插入位置,若是不是,在38行的finally解鎖 current.lock.unlock();並繼續遍歷,若是找到待插入位置,33~36行構造新節點並將其插入鏈表後返回.兩把鎖的解鎖操做在倆finally塊中進行
  • 這種方案可讓多個線程併發的進行鏈表插入操做

條件變量

  • 併發編程常常須要等待某個事件的發生.好比隊列刪除元素前須要等待隊列非空等等
  • 按照下面的模式使用條件變量
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
lock.lock();
try {
  while (! « condition is true » ) {
    condition.await();
  }
  //使用共享資源
} finally { lock.unlock(); }
  • 爲什麼要在一個循環中循環調用await():當另外一個線程調用了signal()或signalAll(),意味着對應的條件可能爲真,await()將原子地恢復運行並從新加鎖.從await()返回後須要從新檢查等待的條件是否爲真,必要的話可能再次調用await()並阻塞
  • 哲學家進餐問題新解決方法
class Philosopher extends Thread {

    private boolean eating;
    private Philosopher left;
    private Philosopher right;
    private ReentrantLock table;
    private Condition condition;
    private Random random;
    private int thinkCount;

    public Philosopher ( ReentrantLock table ) {
        eating = false;
        this.table = table;
        condition = table.newCondition();
        random = new Random();
    }

    public void setLeft ( Philosopher left ) {
        this.left = left;
    }

    public void setRight ( Philosopher right ) {
        this.right = right;
    }

    public void run () {
        try {
            while ( true ) {
                think();
                eat();
            }
        }
        catch ( InterruptedException e ) {
        }
    }

    private void think () throws InterruptedException {
        table.lock();
        try {
            eating = false;
            left.condition.signal();
            right.condition.signal();
        } finally {
            table.unlock();
        }
        ++thinkCount;
        if ( thinkCount % 10 == 0 ) {
            System.out.println( "Philosopher " + this + " has thought " + thinkCount + " times" );
        }
        Thread.sleep( 1000 );
    }

    private void eat () throws InterruptedException {
        table.lock();
        try {
            while ( left.eating || right.eating ) {
                condition.await();
            }
            eating = true;
        } finally {
            table.unlock();
        }
        Thread.sleep( 1000 );
    }
}
  • 如今沒有筷子類,如今僅當哲學家的左右鄰座都沒有進餐的時候,他才能夠進餐
  • 當一個哲學家飢餓的時候,他首先鎖住餐桌,這樣其餘哲學家沒法改變狀態(進餐/思考),而後查看左右的哲學家有沒有在進餐,沒有的話開始進餐並解鎖餐桌,不然調用await(),解鎖餐桌
  • 當一個哲學家進餐結束開始思考的時候,他首先鎖住餐桌並將eating設置爲false,而後通知左鄰右舍能夠進餐了,最後解鎖餐桌.
  • 以前的解決方案常常只有一個哲學家能進餐,其餘人都持有一根筷子在等,這個方案中當一個哲學家理論上能夠進餐,他確定能夠進餐

原子變量

  • 原子變量是無鎖非阻塞算法的基礎
  • volatile是一種低級形式的同步,他的適用場景也愈來愈少,若是你要使用volatile,能夠在atomic包中尋找更適合的工具

2.4 站在巨人的肩膀上

寫入時複製

  • 以前有用到保護性複製,Java標準庫提供了更優雅的現成的方案--CopyOnWriteArrayList,他不是在遍歷列表前進行復制,而是在列表被修改時進行
  • 先將當前容器進行Copy,複製出一個新的容器,而後新的容器裏添加元素,添加完元素以後,再將原容器的引用指向新的容器 因此CopyOnWrite容器也是一種讀寫分離的思想,讀和寫不一樣的容器。CopyOnWriteArrayList適合使用在讀操做遠遠大於寫操做的場景裏,好比緩存
  • 缺點: 1.內存佔用問題 2.數據一致性問題:CopyOnWrite容器只能保證數據的最終一致性,不能保證數據的實時一致性。因此若是你但願寫入的的數據,立刻能讀到,請不要使用CopyOnWrite容器。
//Downloader.java
  private CopyOnWriteArrayList<ProgressListener> listeners;


  public void addListener(ProgressListener listener) {
    listeners.add(listener);
  }
  public void removeListener(ProgressListener listener) {
    listeners.remove(listener);
  }
  private void updateProgress(int n) {
    for (ProgressListener listener: listeners)
      listener.onProgress(n);
  }

一個完整的程序

  • Q:wiki上出現頻率最高的詞

版本一的並行:生產者和消費者(串行的我略過了)

//生產者 將page加到隊尾
class Parser implements Runnable {
  private BlockingQueue<Page> queue;


  public Parser(BlockingQueue<Page> queue) {
    this.queue = queue;
  }
  
  public void run() {
    try {
      Iterable<Page> pages = new Pages(100000, "enwiki.xml");
      for (Page page: pages)
        queue.put(page);
    } catch (Exception e) { e.printStackTrace(); }
  }
}
//消費者
class Counter implements Runnable {
  private BlockingQueue<Page> queue;
  private Map<String, Integer> counts;
  
  public Counter(BlockingQueue<Page> queue,
                 Map<String, Integer> counts) {
    this.queue = queue;
    this.counts = counts;
  }

  public void run() {
    try {
      while(true) {
        Page page = queue.take();
        if (page.isPoisonPill())
          break;


        Iterable<String> words = new Words(page.getText());
        for (String word: words)
          countWord(word);
      }
    } catch (Exception e) { e.printStackTrace(); }
  }


  private void countWord(String word) {
    Integer currentCount = counts.get(word);
    if (currentCount == null)
      counts.put(word, 1);
    else
      counts.put(word, currentCount + 1);
  }
}
  • 最後建立兩個線程運行
public static void main(String[] args) throws Exception {
    ArrayBlockingQueue<Page> queue = new ArrayBlockingQueue<Page>(100);
    HashMap<String, Integer> counts = new HashMap<String, Integer>();
 
    Thread counter = new Thread(new Counter(queue, counts));
    Thread parser = new Thread(new Parser(queue));
    long start = System.currentTimeMillis();
 
    counter.start();
    parser.start();
    parser.join();
    queue.put(new PoisonPill());
    counter.join();
    long end = System.currentTimeMillis();
    System.out.println("Elapsed time: " + (end - start) + "ms");
  }
  • 程序解釋:該程序由兩個線程在跑.一個讀取一個分析,性能還不是最高.這裏 ArrayBlockingQueue<Page> queue = new ArrayBlockingQueue<Page>(100);用了一個阻塞的併發隊列來存放讀取到的page.這個併發隊列很適合實現生產者消費者模式,提供了高效的併發方法put()和take(),這些方法會在必要時阻塞:當一個空隊列調用take(一個滿隊列調用put()),程序會阻塞直到隊列變爲非空(非滿)
  • 爲何要用阻塞隊列? concurrent包不只提供了阻塞隊列,還提供了一種容量無限,操做不需等待,非阻塞的隊列ConcurrentLinkedQueue.爲什麼不用他?關鍵在與生產者和消費者幾乎不會保持相同的速度,當生產者速度快於消費者,生產者愈來愈大的時候,會撐爆內存.相比之下,阻塞隊列只容許生產者的速度在必定程度上超過消費者的速度,但不會超過不少.

第二個版本:多個消費者

  • 上個版本的解析文件花了10s,統計花了95s,一共花了95s.進一步優化就對統計過程進行並行化,創建多個消費者.(他這裏仍是用一個count,不一樣的消費者都寫這一個count,因此要加鎖)
private void countWord(String word) {
    lock.lock();
    try {
      Integer currentCount = counts.get(word);
      if (currentCount == null)  counts.put(word, 1);
      else  counts.put(word, currentCount + 1);
    } finally { lock.unlock(); }
  }
  • 運行多個消費者
ExecutorService executor = Executors.newCachedThreadPool();
    for (int i = 0; i < NUM_COUNTERS; ++i)
      executor.execute(new Counter(queue, counts));
    Thread parser = new Thread(new Parser(queue));
    parser.start();
    parser.join();
    for (int i = 0; i < NUM_COUNTERS; ++i)
      queue.put(new PoisonPill());
    executor.shutdown();
  • 可是一運行發現比串行還慢一半.主要緣由就是過多的線程嘗試訪問同一個共享資源,等待的時間比運行的時間還長.改用ConcurrentHashMap(使用了鎖分段)
//改用 ConcurrentHashMap
  private void countWord(String word) {
    while (true) { //理解一下這裏的循環 若是下面的操做沒有成功的話,就重試
      Integer currentCount = counts.get(word);
      if (currentCount == null) {
        if (counts.putIfAbsent(word, 1) == null) //若是沒有與1關聯 則關聯,有原子性
          break;
      } else if (counts.replace(word, currentCount, currentCount + 1)) {
        break;
      }
    }
  • 此次的測速要好不少,可是沒有理論上的提速.由於消費者對conuts有一些沒必要要的競爭,與其全部消費者都共享一個counts,不如每一個消費者各自維護一個計數map,再對這些計數map進行合併
class Counter implements Runnable {

  private BlockingQueue<Page> queue;
  private ConcurrentMap<String, Integer> counts;
  private HashMap<String, Integer> localCounts;

  public Counter(BlockingQueue<Page> queue,
                 ConcurrentMap<String, Integer> counts) {
    this.queue = queue;
    this.counts = counts;
    localCounts = new HashMap<String, Integer>();
  }

  public void run() {
    try {
      while(true) {
        Page page = queue.take();
        if (page.isPoisonPill())
          break;
        Iterable<String> words = new Words(page.getText());
        for (String word: words)
          countWord(word);
      }
      //因此計數的那個能夠是普通的map 他只在本身的線程裏
      mergeCounts();
    } catch (Exception e) { e.printStackTrace(); }
  }

  private void countWord(String word) {
    Integer currentCount = localCounts.get(word);
    if (currentCount == null)
      localCounts.put(word, 1);
    else
      localCounts.put(word, currentCount + 1);
  }

  private void mergeCounts() {
    for (Map.Entry<String, Integer> e: localCounts.entrySet()) {
      String word = e.getKey();
      Integer count = e.getValue();
      while (true) {
        Integer currentCount = counts.get(word);
        if (currentCount == null) {
          if (counts.putIfAbsent(word, count) == null)
            break;
        } else if (counts.replace(word, currentCount, currentCount + count)) {
          break;
        }
      }
    }
  }
}

第三天總結

  • 1.使用線程池,不要直接建立線程
  • 2.使用CopyOnWriteArrayList讓監聽器相關的代碼更簡單高效
  • 3.使用ArrayBlockingQueue讓生產者和消費者之間高效合做
  • 4.ConcurrentHashMap提供了更好的併發訪問

2.5 複習

  • 線程與鎖的缺點:沒有爲並行提供直接的支持,對於程序員,編程語言層面沒有提供足夠的幫助
  • 應用多線程的難點不在編程,而在於難以測試,多線程的bug很難重現.可維護性更讓人頭疼,若是不能對多線程問題進行可靠的測試,就沒法對多線程進行可靠的重構.使用其餘不那麼底層的模型會好一些.
相關文章
相關標籤/搜索