A concurrent program has multiple logical threads of control. These threads may or may not run in parallel.java
A parallel program may or may not have more than one logical thread of control.node
併發是問題域中的概念:程序須要設計成可以處理多個同時(幾乎同時)發生的事件;而並行則是方法域中的概念:經過將問題中的多個部分併發執行,來加速解決問題.程序員
併發是同一時間**應對(dealing with)多件事件的處理能力;並行是同一時間動手作(doing)**多件事情的能力算法
能夠同時處理多個問題,可是每次只作一件事,是併發.編程
我妻子是一位教師。與衆多教師同樣,她極其善於處理多個任務。她雖然每次只能作一件事,但能夠併發地處理多個任務。好比,在聽一位學生朗讀的時候,她能夠暫停學生的朗讀,以維持課堂秩序,或者回答學生的問題。這是併發,但併不併行(由於僅有她一我的,某一時刻只能進行一件事)。 但若是還有一位助教,則她們中一位能夠聆聽朗讀,而同時另外一位能夠回答問題。這種方式既是併發,也是並行。 假設班級設計了本身的賀卡並要批量製做。一種方法是讓每位學生製做五枚賀卡。這種方法是並行,而(從總體看)不是併發,由於這個過程總體來講只有一個任務。緩存
併發和並行常常被混淆的一個緣由是,傳統的「線程與鎖」模型並無顯式支持並行。若是要用線程與鎖模型爲多核進行開發,惟一的選擇就是寫一個併發的程序,讓其並行地運行在多核上。安全
併發程序一般是不肯定的, 並行程序多是肯定的. 使用一門直接支持並行的編程語言能夠寫出並行程序,而不會引入不入不 肯定性.網絡
終於來到了你們所默認的並行形式——多處理器。從程序員的角度來看,多處理器架構最明 顯的分類特徵是其內存模型(共享內存模型或分佈式內存模型)。多線程
共享內存模型(經過內存通訊)架構
分佈式內存模型( 經過網絡通訊 )
class Counter { private int count = 0; public synchronized void increment() { ++count; } ➤ public int getCount() { return count; } }
class Philosopher extends Thread { private Chopstick left, right; private Random random; public Philosopher(Chopstick left, Chopstick right) { this.left = left; this.right = right; random = new Random(); } public void run() { try { while(true) { Thread.sleep(random.nextInt(1000)); // Think for a while synchronized(left) { // Grab left chopstick // synchronized(right) { // Grab right chopstick // 15 Thread.sleep(random.nextInt(1000)); // Eat for a while } } } } catch(InterruptedException e) {} } }
class Philosopher extends Thread { private Chopstick first, second; private Random random; private int thinkCount; public Philosopher(Chopstick left, Chopstick right) { if(left.getId() < right.getId()) { first = left; second = right; } else { first = right; second = left; } random = new Random(); } public void run() { try { while(true) { ++thinkCount; if (thinkCount % 10 == 0) System.out.println("Philosopher " + this + " has thought " + thinkCount + " times"); Thread.sleep(random.nextInt(1000)); // Think for a while synchronized(first) { // Grab first chopstick synchronized(second) { // Grab second chopstick Thread.sleep(random.nextInt(1000)); // Eat for a while } } } } catch(InterruptedException e) {} } }
程序解釋:當全部人同時決定進餐的時候,ABCD左手分別拿起1234號筷子(對於他們小的編號的筷子仍是在左手),這和上面的程序沒啥不一樣,可是差異就在這個E,他左邊的筷子是大編號,因此他左手拿的是1,然而1被A拿了,因此他就一隻筷子都拿不到,因此D能夠正常進餐,就不會死鎖
侷限:獲取鎖的代碼寫的比較集中的話,有利於維護這個全局順序,可是對於規模比較大的程序,使用鎖的地方比較零散,各處都遵照這個順序就顯得不太實際.
技巧:使用對象的散列值做爲鎖的全局順序
優勢:適用於全部java對象,不用爲鎖對象專門定義並維護一個順序,
缺點:可是對象的散列值不能保證惟一性(雖然概率很小), 不是無可奈何不要使用
if(System.identityHashCode(left) < System.identityHashCode(right)) { first = left; second = right; } else { first = right; second = left; }
private synchronized void updateProgress(int n) { for (ProgressListener listener: listeners) // listeners是累的一個field listener.onProgress(n); }
private void updateProgress(int n) { ArrayList<ProgressListener> listenersCopy; synchronized(this) { listenersCopy = (ArrayList<ProgressListener>)listeners.clone(); } for (ProgressListener listener: listenersCopy) listener.onProgress(n); }
Lock lock = new ReentrantLock(); lock.lock(); try{ //使用共享資源 } finally { //使用finally確保鎖釋放 lock.unlock(); }
final ReentrantLock l1 = new ReentrantLock(); final ReentrantLock l2 = new ReentrantLock(); Thread t1 = new Thread() { public void run() { try { l1.lockInterruptibly(); Thread.sleep(1000); l2.lockInterruptibly(); } catch (InterruptedException e) { System.out.println("t1 interrupted"); } } };
class Philosopher extends Thread { private ReentrantLock leftChopstick, rightChopstick; private Random random; private int thinkCount; public Philosopher(ReentrantLock leftChopstick, ReentrantLock rightChopstick) { this.leftChopstick = leftChopstick; this.rightChopstick = rightChopstick; random = new Random(); } public void run() { try { while(true) { ++thinkCount; if (thinkCount % 10 == 0) System.out.println("Philosopher " + this + " has thought " + thinkCount + " times"); Thread.sleep(random.nextInt(1000)); // Think for a while leftChopstick.lock(); try { if (rightChopstick.tryLock(1000, TimeUnit.MILLISECONDS)) { // Got the right chopstick try { Thread.sleep(random.nextInt(1000)); // Eat for a while } finally { rightChopstick.unlock(); } } else { // Didn't get the right chopstick - give up and go back to thinking System.out.println("Philosopher " + this + " timed out"); } } finally { leftChopstick.unlock(); } } } catch(InterruptedException e) {} } }
交替鎖能夠只鎖住鏈表的一部分,容許不涉及被鎖部分的其餘線程自由訪問鏈表.插入新的鏈表節點時,須要將待插入位置兩邊的節點加鎖.首先鎖住鏈表的前兩個節點,若是這兩個節點之間不是待插入的位置,那麼就解鎖第一個,並鎖住第三個,以此類推,知道找到待插入位置並插入新的節點,最後解鎖兩邊的節點
這種交替型的加鎖和解鎖順序沒法用內置鎖實現,使用ReentrantLock能夠
class ConcurrentSortedList { private class Node { int value; Node prev; Node next; ReentrantLock lock = new ReentrantLock(); Node() {} Node(int value, Node prev, Node next) { this.value = value; this.prev = prev; this.next = next; } } private final Node head; private final Node tail; public ConcurrentSortedList() { head = new Node(); tail = new Node(); head.next = tail; tail.prev = head; } //insert方法是有序的 遍歷列表直到找到第一個值小於等於新插入的值得節點,在這個位置插入 public void insert(int value) { Node current = head; current.lock.lock(); Node next = current.next; try { while (true) { next.lock.lock(); try { if (next == tail || next.value < value) { Node node = new Node(value, current, next); next.prev = node; current.next = node; //!!!這裏return要在兩個finally都執行完後纔會執行啊!!!但只是finally裏的.不過要是return換成exit(0)就直接退出了 return; } } finally { current.lock.unlock(); } current = next; next = current.next; } } finally { next.lock.unlock(); } } public int size() { Node current = tail; int count = 0; while (current.prev != head) { ReentrantLock lock = current.lock; lock.lock(); try { ++count; current = current.prev; } finally { lock.unlock(); } } return count; } public boolean isSorted() { Node current = head; while (current.next.next != tail) { current = current.next; if (current.value < current.next.value) return false; } return true; } } class LinkedList { public static void main(String[] args) throws InterruptedException { final ConcurrentSortedList list = new ConcurrentSortedList(); final Random random = new Random(); class TestThread extends Thread { public void run() { for (int i = 0; i < 10000; ++i) list.insert(random.nextInt()); } } class CountingThread extends Thread { public void run() { while (!interrupted()) { System.out.print("\r" + list.size()); System.out.flush(); } } } Thread t1 = new TestThread(); Thread t2 = new TestThread(); Thread t3 = new CountingThread(); //注意一下這裏的用法 這裏先join再interrupted的用法 t1.start(); t2.start(); t3.start(); t1.join(); t2.join(); t3.interrupt(); System.out.println("\r" + list.size()); if (list.size() != 20000) System.out.println("*** Wrong size!"); if (!list.isSorted()) System.out.println("*** Not sorted!"); } }
ReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition(); lock.lock(); try { while (! « condition is true » ) { condition.await(); } //使用共享資源 } finally { lock.unlock(); }
class Philosopher extends Thread { private boolean eating; private Philosopher left; private Philosopher right; private ReentrantLock table; private Condition condition; private Random random; private int thinkCount; public Philosopher ( ReentrantLock table ) { eating = false; this.table = table; condition = table.newCondition(); random = new Random(); } public void setLeft ( Philosopher left ) { this.left = left; } public void setRight ( Philosopher right ) { this.right = right; } public void run () { try { while ( true ) { think(); eat(); } } catch ( InterruptedException e ) { } } private void think () throws InterruptedException { table.lock(); try { eating = false; left.condition.signal(); right.condition.signal(); } finally { table.unlock(); } ++thinkCount; if ( thinkCount % 10 == 0 ) { System.out.println( "Philosopher " + this + " has thought " + thinkCount + " times" ); } Thread.sleep( 1000 ); } private void eat () throws InterruptedException { table.lock(); try { while ( left.eating || right.eating ) { condition.await(); } eating = true; } finally { table.unlock(); } Thread.sleep( 1000 ); } }
//Downloader.java private CopyOnWriteArrayList<ProgressListener> listeners; public void addListener(ProgressListener listener) { listeners.add(listener); } public void removeListener(ProgressListener listener) { listeners.remove(listener); } private void updateProgress(int n) { for (ProgressListener listener: listeners) listener.onProgress(n); }
//生產者 將page加到隊尾 class Parser implements Runnable { private BlockingQueue<Page> queue; public Parser(BlockingQueue<Page> queue) { this.queue = queue; } public void run() { try { Iterable<Page> pages = new Pages(100000, "enwiki.xml"); for (Page page: pages) queue.put(page); } catch (Exception e) { e.printStackTrace(); } } } //消費者 class Counter implements Runnable { private BlockingQueue<Page> queue; private Map<String, Integer> counts; public Counter(BlockingQueue<Page> queue, Map<String, Integer> counts) { this.queue = queue; this.counts = counts; } public void run() { try { while(true) { Page page = queue.take(); if (page.isPoisonPill()) break; Iterable<String> words = new Words(page.getText()); for (String word: words) countWord(word); } } catch (Exception e) { e.printStackTrace(); } } private void countWord(String word) { Integer currentCount = counts.get(word); if (currentCount == null) counts.put(word, 1); else counts.put(word, currentCount + 1); } }
public static void main(String[] args) throws Exception { ArrayBlockingQueue<Page> queue = new ArrayBlockingQueue<Page>(100); HashMap<String, Integer> counts = new HashMap<String, Integer>(); Thread counter = new Thread(new Counter(queue, counts)); Thread parser = new Thread(new Parser(queue)); long start = System.currentTimeMillis(); counter.start(); parser.start(); parser.join(); queue.put(new PoisonPill()); counter.join(); long end = System.currentTimeMillis(); System.out.println("Elapsed time: " + (end - start) + "ms"); }
private void countWord(String word) { lock.lock(); try { Integer currentCount = counts.get(word); if (currentCount == null) counts.put(word, 1); else counts.put(word, currentCount + 1); } finally { lock.unlock(); } }
ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < NUM_COUNTERS; ++i) executor.execute(new Counter(queue, counts)); Thread parser = new Thread(new Parser(queue)); parser.start(); parser.join(); for (int i = 0; i < NUM_COUNTERS; ++i) queue.put(new PoisonPill()); executor.shutdown();
//改用 ConcurrentHashMap private void countWord(String word) { while (true) { //理解一下這裏的循環 若是下面的操做沒有成功的話,就重試 Integer currentCount = counts.get(word); if (currentCount == null) { if (counts.putIfAbsent(word, 1) == null) //若是沒有與1關聯 則關聯,有原子性 break; } else if (counts.replace(word, currentCount, currentCount + 1)) { break; } }
class Counter implements Runnable { private BlockingQueue<Page> queue; private ConcurrentMap<String, Integer> counts; private HashMap<String, Integer> localCounts; public Counter(BlockingQueue<Page> queue, ConcurrentMap<String, Integer> counts) { this.queue = queue; this.counts = counts; localCounts = new HashMap<String, Integer>(); } public void run() { try { while(true) { Page page = queue.take(); if (page.isPoisonPill()) break; Iterable<String> words = new Words(page.getText()); for (String word: words) countWord(word); } //因此計數的那個能夠是普通的map 他只在本身的線程裏 mergeCounts(); } catch (Exception e) { e.printStackTrace(); } } private void countWord(String word) { Integer currentCount = localCounts.get(word); if (currentCount == null) localCounts.put(word, 1); else localCounts.put(word, currentCount + 1); } private void mergeCounts() { for (Map.Entry<String, Integer> e: localCounts.entrySet()) { String word = e.getKey(); Integer count = e.getValue(); while (true) { Integer currentCount = counts.get(word); if (currentCount == null) { if (counts.putIfAbsent(word, count) == null) break; } else if (counts.replace(word, currentCount, currentCount + count)) { break; } } } } }