前面一個系列的文章都在圍繞hash展開,今天準備先說下concurrent包,這個系列可能會以使用場景說明爲主,concurrent包自己的代碼分析可能比較少; 我在這方面的實踐經驗較爲有限,有錯誤歡迎批評指正 html
不過前一個系列並未結束,還有一些文章沒有放出來,歡迎關注核桃博客2. concurrent包裏面的一些操做是基於硬件級別的CAS(compare and swap),就是在cpu級別提供了原子操做,簡單的說就能夠提供無阻塞、無鎖定的算法; 而現代cpu大部分都是支持這樣的算法的;java
前面一篇說了concurrent包的基本結構,接下來首先看一下一個很是有用的類,CountDownLatch, 能夠用來在一個線程中等待多個線程完成任務的類; 算法
前面一篇說了concurrent包的基本結構,接下來首先看一下一個很是有用的類,CountDownLatch, 能夠用來在一個線程中等待多個線程完成任務的類;Java代碼 編程
@Test public void demoCountDown() { int count = 10; final CountDownLatch l = new CountDownLatch(count); for(int i = 0; i < count; ++i) { final int index = i; new Thread(new Runnable() { @Override public void run() { try { Thread.currentThread().sleep(20 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("thread " + index + " has finished..."); l.countDown(); } }).start(); } try { l.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("now all threads have finished"); }運行的結果
前面10個線程的執行完成順序會變化,可是最後一句始終會等待前面10個線程都完成以後纔會執行數組
有了CountDownLatch,涉及到多線程同步的演示就比較容易了,接下來咱們看下Atomic相關的類, 好比AtomicLong, AtomicInteger等這些; 緩存
有了CountDownLatch,涉及到多線程同步的演示就比較容易了,接下來咱們看下Atomic相關的類, 好比AtomicLong, AtomicInteger等這些;Java代碼 安全
set() 多線程
get() 併發
getAndSet() jvm
getAndIncrement()
getAndDecrement()
getAndAdd()
Java代碼
package com.hetaoblog.concurrent.test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; import org.junit.Test; /** * * by http://www.hetaoblog.com * @author hetaoblog * */ public class AtomicTest { @Test public void testAtomic() { final int loopcount = 10000; int threadcount = 10; final NonSafeSeq seq1 = new NonSafeSeq(); final SafeSeq seq2 = new SafeSeq(); final CountDownLatch l = new CountDownLatch(threadcount); for(int i = 0; i < threadcount; ++i) { final int index = i; new Thread(new Runnable() { @Override public void run() { for(int j = 0; j < loopcount; ++j) { seq1.inc(); seq2.inc(); } System.out.println("finished : " + index); l.countDown(); } }).start(); } try { l.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("both have finished...."); System.out.println("NonSafeSeq:" + seq1.get()); System.out.println("SafeSeq with atomic: " + seq2.get()); } } class NonSafeSeq{ private long count = 0; public void inc() { count++; } public long get() { return count; } } class SafeSeq{ private AtomicLong count = new AtomicLong(0); public void inc() { count.incrementAndGet(); } public long get() { return count.longValue(); } }其中NonSafeSeq是做爲對比的類,直接放一個private long count不是線程安全的,而SafeSeq裏面放了一個AtomicLong,是線程安全的;能夠直接調用incrementAndGet來增長
注意,這個例子也說明,雖然long自己的單個設置是原子的,要麼成功要麼不成功,可是諸如count++這樣的操做就不是線程安全的;由於這包括了讀取和寫入兩步操做;
在jdk 1.4時代,線程間的同步主要依賴於synchronized關鍵字,本質上該關鍵字是一個對象鎖,能夠加在不一樣的instance上或者class上,從使用的角度則分別能夠加在非靜態方法,靜態方法,以及直接synchronized(MyObject)這樣的用法;
在jdk 1.4時代,線程間的同步主要依賴於synchronized關鍵字,本質上該關鍵字是一個對象鎖,能夠加在不一樣的instance上或者class上,從使用的角度則分別能夠加在非靜態方法,靜態方法,以及直接synchronized(MyObject)這樣的用法;Java代碼
package com.hetaoblog.concurrent.test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.ReentrantLock; import org.junit.Test; public class ReentrantLockDemo { @Test public void demoLock() { final int loopcount = 10000; int threadcount = 10; final SafeSeqWithLock seq = new SafeSeqWithLock(); final CountDownLatch l = new CountDownLatch(threadcount); for(int i = 0; i < threadcount; ++i) { final int index = i; new Thread(new Runnable() { @Override public void run() { for(int j = 0; j < loopcount; ++j) { seq.inc(); } System.out.println("finished : " + index); l.countDown(); } }).start(); } try { l.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("both have finished...."); System.out.println("SafeSeqWithLock:" + seq.get()); } } class SafeSeqWithLock{ private long count = 0; private ReentrantLock lock = new ReentrantLock(); public void inc() { lock.lock(); try{ count++; } finally{ lock.unlock(); } } public long get() { return count; } }一樣之前面的相似Sequence的類舉例,經過對inc操做加鎖,保證了線程安全;
SafeSeqWithLock:100000
concurrent包裏面還提供了一個很是有用的鎖,讀寫鎖ReadWriteLock
concurrent包裏面還提供了一個很是有用的鎖,讀寫鎖ReadWriteLockJava代碼
@Test public void testRWLock_getw_onr() { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); final Lock rlock = lock.readLock(); final Lock wlock = lock.writeLock(); final CountDownLatch l = new CountDownLatch(2); // start r thread new Thread(new Runnable() { @Override public void run() { System.out.println(new Date() + "now to get rlock"); rlock.lock(); try { Thread.currentThread().sleep(20 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new Date() + "now to unlock rlock"); rlock.unlock(); l.countDown(); } }).start(); // start w thread new Thread(new Runnable() { @Override public void run() { System.out.println(new Date() + "now to get wlock"); wlock.lock(); System.out.println(new Date() + "now to unlock wlock"); wlock.unlock(); l.countDown(); } }).start(); try { l.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new Date() + "finished"); }這代碼在我機器上打印的結果是, 也就是試圖得到寫鎖的線程只有當另一個線程將讀鎖釋放了之後才能夠得到
Java代碼
@Test public void testRWLock_downgrade() { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); Lock rlock = lock.readLock(); Lock wlock = lock.writeLock(); System.out.println("now to get wlock"); wlock.lock(); System.out.println("now to get rlock"); rlock.lock(); System.out.println("now to unlock wlock"); wlock.unlock(); System.out.println("now to unlock rlock"); rlock.unlock(); System.out.println("finished"); }能夠正常打印出
Java代碼
@Test public void testRWLock_upgrade() { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); Lock rlock = lock.readLock(); Lock wlock = lock.writeLock(); System.out.println("now to get rlock"); rlock.lock(); System.out.println("now to get wlock"); wlock.lock(); System.out.println("now to unlock wlock"); wlock.unlock(); System.out.println("now to unlock rlock"); rlock.unlock(); System.out.println("finished"); }只能打印出下面兩句,後面就一直掛住了
now to get wlock
有網友建議我在介紹concurrent包以前先介紹下jdk1.5以前的多線程知識,這是個至關不錯的想法, 這篇就先介紹下Thread類;
有網友建議我在介紹concurrent包以前先介紹下jdk1.5以前的多線程知識,這是個至關不錯的想法, 這篇就先介紹下Thread類;Java代碼
public class ThreadDemo { @Test public void testThread() { SimpleThread t = new SimpleThread(); t.start(); } } class SimpleThread extends Thread{ @Override public void run() { System.out.println( Thread.currentThread().getName() + " is running "); } }一般在run方法裏面實現本身要作的功能,這裏簡單的打印了了一句話, 運行結果是
Java代碼
public class ThreadDemo { public static void main(String[] args) { PeriodicalRunningThread t = new PeriodicalRunningThread(); t.start(); System.out.println("main thread is going to sleep..."); try { Thread.currentThread().sleep(20 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new Date() + " now to stop PeriodicalRunningThread"); t.setRunning(false); } } class PeriodicalRunningThread extends Thread{ private volatile boolean running = true; @Override public void run() { while(running) { System.out.println(new Date() + " " + Thread.currentThread().getName() + " is running " + new Date()); try { Thread.currentThread().sleep(5 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(new Date() + " " + Thread.currentThread().getName() + " will end"); } public void setRunning(boolean running) { this.running = running; } }這段代碼的打印結果是:
這樣,在這個running標識爲true的時候,該線程一直在跑,可是完成一段任務後會sleep一段時間,而後繼續執行;
這篇仍是Thread和Runnable的基礎
這篇仍是Thread和Runnable的基礎Java代碼
public static void testThreadWithRunnable() { final String word = "hello,world"; new Thread(new Runnable() { @Override public void run() { System.out.println(word); } }).start(); } public static void main(String[] args) { //periodicalThreadTest(); testThreadWithRunnable(); }上面的代碼會打印
hello,world
下一節我會結合具體的代碼來介紹下Condition的使用;
Java代碼
class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }代碼意思不復雜,一個有界的buffer,裏面是個數組,能夠往裏面放數據和取數據;
c. 調用notempty.signal(); 若是有線程在take()的時候await住了,那麼就會被通知到,能夠繼續進行操做
前面一篇說了Condition和BoundedBuffer的基本代碼,下面寫一個簡單的程序測試下這個BoundedBuffer;
前面一篇說了Condition和BoundedBuffer的基本代碼,下面寫一個簡單的程序測試下這個BoundedBuffer;Java代碼
package com.hetaoblog.concurrent.test; import java.util.Date; import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.junit.Test; public class BoundedBufferTest { @Test public void testPutTake() { final BoundedBuffer bb = new BoundedBuffer(); int count = 10; final CountDownLatch c = new CountDownLatch(count * 2); System.out.println(new Date() + " now try to call put for " + count ); for(int i = 0; i < count ; ++i) { final int index = i; try { Thread t = new Thread(new Runnable() { @Override public void run() { try { bb.put(index); System.out.println(new Date() + " put finished: " + index); } catch (InterruptedException e) { e.printStackTrace(); } c.countDown(); } }); t.start(); } catch (Exception e) { e.printStackTrace(); } } try { System.out.println(new Date() + " main thread is going to sleep for 10 seconds"); Thread.sleep(10 * 1000); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println(new Date() + " now try to take for count: " + count); for(int i =0; i < count; ++i) { Thread t= new Thread(new Runnable() { @Override public void run() { try { Object o = bb.take(); System.out.println(new Date() + " take get: " + o); } catch (InterruptedException e) { e.printStackTrace(); } c.countDown(); } }); t.start(); } try { System.out.println(new Date() + ": main thread is to wait for all threads"); c.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new Date() + " all threads finished"); } } class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[5]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) { System.out.println(new Date() + " put is to wait...."); notFull.await(); } items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) { System.out.println(new Date() + " take is going to wait.."); notEmpty.await(); } Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }下面是這段程序在我機器上的運行結果:
在幾回不一樣的執行中,始終能夠觀察到任什麼時候候,未完成的take()線程數>= 未完成的put()線程; 在未完成的線程數相等的狀況下,即便jvm首先調度到了take()線程,也會進入notEmpty.await()釋放鎖,進入等待
前面一篇說了一個Condition和BoundedBuffer的測試代碼例子,前面測試的是先put()再take()的操做,這篇說一下先take()再put()的操做;
前面一篇說了一個Condition和BoundedBuffer的測試代碼例子,前面測試的是先put()再take()的操做,這篇說一下先take()再put()的操做;Java代碼
@Test public void testTakePut() { final BoundedBuffer bb = new BoundedBuffer(); int count = 10; final CountDownLatch c = new CountDownLatch(count * 2); System.out.println(new Date() + " first try to call take for count: " + count); for(int i =0; i < count; ++i) { final int index = i; Thread t= new Thread(new Runnable() { @Override public void run() { try { Thread.currentThread().setName(" TAKE " + index); Object o = bb.take(); System.out.println(new Date() + " " + " take get: " + o ); } catch (InterruptedException e) { e.printStackTrace(); } c.countDown(); } }); t.start(); } try { System.out.println(new Date() + " main thread is going to sleep for 10 seconds"); Thread.sleep(10 * 1000); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println(new Date() + " now try to call put for " + count ); for(int i = 0; i < count ; ++i) { final int index = i; try { Thread t = new Thread(new Runnable() { @Override public void run() { Thread.currentThread().setName(" PUT " + index); try { bb.put(index); System.out.println(new Date() + " " + " put finished: " + index ); } catch (InterruptedException e) { e.printStackTrace(); } c.countDown(); } }); t.start(); } catch (Exception e) { e.printStackTrace(); } } try { System.out.println(new Date() + ": main thread is to wait for all threads"); c.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new Date() + " all threads finished"); } class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[5]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) { System.out.println(new Date() + " " + Thread.currentThread().getName() + " put is to wait....: " + System.currentTimeMillis()); notFull.await(); } items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) { System.out.println(new Date() + " " + Thread.currentThread().getName() + " take is going to wait.. " + System.currentTimeMillis()); notEmpty.await(); } Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }運行結果1: Fri Mar 16 20:50:10 CST 2012 first try to call take for count: 10 Fri Mar 16 20:50:10 CST 2012 TAKE 0 take is going to wait.. Fri Mar 16 20:50:10 CST 2012 TAKE 1 take is going to wait.. Fri Mar 16 20:50:10 CST 2012 TAKE 2 take is going to wait.. Fri Mar 16 20:50:10 CST 2012 TAKE 3 take is going to wait.. Fri Mar 16 20:50:10 CST 2012 TAKE 5 take is going to wait.. Fri Mar 16 20:50:10 CST 2012 main thread is going to sleep for 10 seconds Fri Mar 16 20:50:10 CST 2012 TAKE 4 take is going to wait.. Fri Mar 16 20:50:10 CST 2012 TAKE 7 take is going to wait.. Fri Mar 16 20:50:10 CST 2012 TAKE 6 take is going to wait.. Fri Mar 16 20:50:10 CST 2012 TAKE 9 take is going to wait.. Fri Mar 16 20:50:10 CST 2012 TAKE 8 take is going to wait.. Fri Mar 16 20:50:20 CST 2012 now try to call put for 10 Fri Mar 16 20:50:20 CST 2012: main thread is to wait for all threads Fri Mar 16 20:50:20 CST 2012 PUT 7 put finished: 7 Fri Mar 16 20:50:20 CST 2012 PUT 9 put finished: 9 Fri Mar 16 20:50:20 CST 2012 PUT 8 put finished: 8 Fri Mar 16 20:50:20 CST 2012 PUT 3 put is to wait.... Fri Mar 16 20:50:20 CST 2012 PUT 1 put is to wait.... Fri Mar 16 20:50:20 CST 2012 PUT 5 put finished: 5 Fri Mar 16 20:50:20 CST 2012 PUT 4 put is to wait.... Fri Mar 16 20:50:20 CST 2012 TAKE 0 take get: 8 Fri Mar 16 20:50:20 CST 2012 TAKE 2 take get: 9 Fri Mar 16 20:50:20 CST 2012 TAKE 3 take get: 0 Fri Mar 16 20:50:20 CST 2012 TAKE 5 take get: 6 Fri Mar 16 20:50:20 CST 2012 TAKE 4 take get: 5 Fri Mar 16 20:50:20 CST 2012 PUT 2 put finished: 2 Fri Mar 16 20:50:20 CST 2012 PUT 3 put finished: 3 Fri Mar 16 20:50:20 CST 2012 PUT 1 put finished: 1 Fri Mar 16 20:50:20 CST 2012 TAKE 7 take get: 2 Fri Mar 16 20:50:20 CST 2012 TAKE 6 take get: 3 Fri Mar 16 20:50:20 CST 2012 TAKE 9 take get: 1 Fri Mar 16 20:50:20 CST 2012 TAKE 8 take get: 4 Fri Mar 16 20:50:20 CST 2012 PUT 6 put finished: 6 Fri Mar 16 20:50:20 CST 2012 PUT 0 put finished: 0 Fri Mar 16 20:50:20 CST 2012 PUT 4 put finished: 4 Fri Mar 16 20:50:20 CST 2012 TAKE 1 take get: 7 Fri Mar 16 20:50:20 CST 2012 all threads finished 注意到紅色部分: 第一個加爲紅色是由於按照打印結果,put()只完成了3次,就開始有put()進入等待了,而BoundedBuffer的大小是5,理論上應該沒有滿的! 第二個加爲紅色是由於元素4居然先被take,而後再被put! 顯然程序有地方出錯了!具體緣由分析,歡迎關注核桃博客:)