CAS:http://huangyunbin.iteye.com/blog/1942369併發
Future:http://blog.csdn.net/liulipuo/article/details/39029643異步
CAS基於衝突檢測的無鎖併發策略,性能也較高。CAS操做有3個操做數,內存值M,預期值E,新值U,若是M==E,則將內存值修改成B,不然啥都不作。性能
public class TestUnsafe { private static Unsafe unsafe; static{ try { //經過反射獲取rt.jar下的Unsafe類 Field field = Unsafe.class.getDeclaredField("theUnsafe"); field.setAccessible(true); unsafe = (Unsafe) field.get(null); } catch (Exception e) { System.out.println("Get Unsafe instance occur error"+ e); } } public static void main(String[] args) throws Exception { Class clazz = Target.class; Field[] fields = clazz.getDeclaredFields(); System.out.println("fieldName:fieldOffset"); for (Field f : fields) { // 獲取屬性偏移量,能夠經過這個偏移量給屬性設置 System.out.println(f.getName() + ":" + unsafe.objectFieldOffset(f)); } Target target = new Target(); Field intFiled = clazz.getDeclaredField("intParam") ; int a=(Integer)intFiled.get(target ) ; System.out.println("原始值是:"+a); //intParam的字段偏移是12 原始值是3 咱們要改成10 System.out.println(unsafe.compareAndSwapInt(target, 12, 3, 10)); int b=(Integer)intFiled.get(target) ; System.out.println("改變以後的值是:"+b); //這個時候已經改成10了,因此會返回false System.out.println(unsafe.compareAndSwapInt(target, 12, 3, 10)); //false //判斷target對象的偏移量24位置,即strParam,若是該變量值=null,則給strParam賦值5 System.out.println(unsafe.compareAndSwapObject(target, 24, null, "5")); //true } } class Target{ int intParam=3; long longParam; String strParam; String strParam2; }
測試結果:測試
futureTask的異步計算功能就不介紹了,主要分析源碼,下圖是futureTask的結構:this
NEW 新建 0
COMPLETING 執行中 1
NORMAL 正常 2
EXCEPTIONAL 異常 3
CANCELLED 取消 4
INTERRUPTING 中斷中 5
INTERRUNPED 被中斷 6
state的狀態變化能夠有四種方式
NEW->COMPLETING->NORMAL 正常完成的流程
NEW->COMPLETING->EXCEPTIONAL 出現異常的流程
NEW->CANCELED 被取消
NEW->INTERRUNPING->INTERRRUNPTED 被中斷spa
建立FutureTask.net
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; //state初始化NEW狀態 this.state = NEW; // ensure visibility of callable }
run方法:線程
1 public void run() { 2 //若是當前state不是建立狀態,且已經在運行。則直接return。 3 if (state != NEW || 4 !UNSAFE.compareAndSwapObject(this, runnerOffset, 5 null, Thread.currentThread())) 6 return; 7 //開始執行任務 8 try { 9 Callable<V> c = callable; 10 //建立狀態,且callable不爲空,開始執行調用call() 11 if (c != null && state == NEW) { 12 V result; 13 boolean ran; 14 try { 15 result = c.call(); 16 ran = true; 17 } catch (Throwable ex) { 18 result = null; 19 ran = false; 20 setException(ex); 21 } 22 //成功調用任務,執行set()方法 23 if (ran) 24 set(result); 25 } 26 } finally { 27 // runner must be non-null until state is settled to 28 // prevent concurrent calls to run() 29 //不管是否成功,都要把runner設置爲null 30 runner = null; 31 // state must be re-read after nulling runner to prevent 32 // leaked interrupts 33 int s = state; 34 if (s >= INTERRUPTING) 35 handlePossibleCancellationInterrupt(s); 36 } 37 }
接下來看一下24行的set方法:3d
1 protected void set(V v) { 2 //CAS設置state NEW -> COMPLETING 3 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { 4 //結果賦值outcome 5 outcome = v; 6 //CAS將state設置爲NORMAL 7 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state 8 //喚醒全部的等待線程,同時設置callable=null,調用done(),具體實現後面詳細介紹 9 finishCompletion(); 10 } 11 }
cancel()取消code
1 public boolean cancel(boolean mayInterruptIfRunning) { 2 //若是state已經不是NEW 或者NEW不能設置成INTERRUPTING , 則直接返回false 。 3 if (!(state == NEW && 4 UNSAFE.compareAndSwapInt(this, stateOffset, NEW, 5 mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) 6 return false; 7 try { // in case call to interrupt throws exception 8 //若是是可中斷 那麼就 調用系統中斷方法 而後把狀態設置成INTERRUPTED 9 if (mayInterruptIfRunning) { 10 try { 11 Thread t = runner; 12 if (t != null) 13 //中斷執行線程 14 t.interrupt(); 15 } finally { // final state 16 //最後設置state爲INTERRUPTED 17 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); 18 } 19 } 20 } finally { 21 finishCompletion(); 22 } 23 return true; 24 }
再來看一下get操做:
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; //若是還沒完成,而且等待timeout也沒有完成,則拋出超時異常 if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }
awaitDone()操做來如何實現超時,已經完成但未超時,則結束等待返回結果,看具體代碼:
private int awaitDone(boolean timed, long nanos) throws InterruptedException { //超時時間點 final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; //循環執行 for (;;) { //中斷處理 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; //首先判斷是否完成,若是完成,直接返回state結果 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); //未完成執行,則將當前線程(主線程)設置成等待節點並放入到等待隊列中 else if (q == null) q = new WaitNode();
//將waiters設置成q的next節點,並替換,改操做只作一次,將main線程加入到隊列中,只加一次 else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); //若是啓動了超時等待 else if (timed) { //計算剩餘超時時間 nanos = deadline - System.nanoTime(); //超時,移除隊列中的等待線程,返回state,上層判斷state,並報出超時異常 if (nanos <= 0L) { removeWaiter(q); return state; } //還未超時,則等待nanos時間,然而若是task執行接受,並不會一直等待,在run()中的set()中,會執行finishCompleted(),將對待隊列線性喚醒,調用LockSupport.unpark()方法來喚醒等待線程 LockSupport.parkNanos(this, nanos); } else //未啓用超時功能,在未完成callable,則無限期等待 LockSupport.park(this); } }
最後看一下finishCompletion()方法,調用callable,是如何喚醒main線程
private void finishCompletion() { // assert state > COMPLETING; //遍歷等待隊列 for (WaitNode q; (q = waiters) != null;) { //將waiters的頭結點中線程設置null,即移除某個等待線程 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; //若是等待線程不爲null 調用LockSupport.unpark(t)喚醒,而後操做下一個等待線程 if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
至此,FutureTask的源碼解析已經完成,接下來看一下開源對於異步處理的解決方案。
觸類旁通:
ChannelFuture
ListenableFuture