同步控制不只有synchronized配合object.wait()以及object.notify(),也有加強版的reentrantLock(重入鎖)java
public class ReenterLock implements Runnable{ public static ReentrantLock lock=new ReentrantLock(); public static int i=0; @Override public void run() { for(int j=0;j<10000000;j++){ lock.lock(); lock.lock(); //此處演示重入性 try{ i++; }finally{ lock.unlock(); //退出臨界區必須解鎖 lock.unlock(); } } } public static void main(String[] args) throws InterruptedException { ReenterLock tl=new ReenterLock(); Thread t1=new Thread(tl); Thread t2=new Thread(tl); t1.start();t2.start(); t1.join();t2.join(); System.out.println(i); //計算結果爲 20000000 } }
咱們來看下reentrantlock相比synchronized鎖有何優勢:算法
面對死鎖,彷佛synchronized沒有任何主動解決策略,而reentrantlock則能夠輕鬆解決緩存
public class IntLock implements Runnable { public static ReentrantLock lock1 = new ReentrantLock(); public static ReentrantLock lock2 = new ReentrantLock(); int lock; /** * 控制加鎖順序,方便構造死鎖 * @param lock */ public IntLock(int lock) { this.lock = lock; } @Override public void run() { try { if (lock == 1) { lock1.lockInterruptibly(); //可中斷的加鎖 try{ Thread.sleep(500); }catch(InterruptedException e){} lock2.lockInterruptibly(); } else { lock2.lockInterruptibly(); try{ Thread.sleep(500); }catch(InterruptedException e){} lock1.lockInterruptibly(); } } catch (InterruptedException e) { e.printStackTrace(); System.out.println(Thread.currentThread().getName()+":線程被中斷"); } finally { if (lock1.isHeldByCurrentThread()) lock1.unlock(); if (lock2.isHeldByCurrentThread()) lock2.unlock(); System.out.println(Thread.currentThread().getName()+":線程退出"); } } public static void main(String[] args) throws InterruptedException { IntLock r1 = new IntLock(1); IntLock r2 = new IntLock(2); Thread t1 = new Thread(r1,"線程1"); Thread t2 = new Thread(r2,"線程2"); t1.start();t2.start(); Thread.sleep(1000); //中斷其中一個線程 t2.interrupt(); } } // 輸出結果: // java.lang.InterruptedException // at //java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898) // at //java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222) // at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335) // at geym.conc.ch3.synctrl.IntLock.run(IntLock.java:31) // at java.lang.Thread.run(Thread.java:745) // 線程2:線程被中斷 // 線程2:線程退出 // 線程1:線程退出
由上可知,當t1,t2造成死鎖時,能夠主動利用中斷來解開,但完成任務的只有t1,t2被中斷. 而若是換成synchronized則將沒法進行中斷多線程
lock1.tryLock(); //嘗試獲取鎖,得到當即返回true,未得到當即返回false lock1.tryLock(5, TimeUnit.SECONDS); //嘗試獲取鎖,5秒內未得到則返回false,得到返回true
public class TryLock implements Runnable { public static ReentrantLock lock1 = new ReentrantLock(); public static ReentrantLock lock2 = new ReentrantLock(); int lock; public TryLock(int lock) { this.lock = lock; } @Override public void run() { if (lock == 1) { while (true) { if (lock1.tryLock()) { try { try { Thread.sleep(500); } catch (InterruptedException e) { } if (lock2.tryLock()) { try { System.out.println(Thread.currentThread() .getId() + ":My Job done"); return; } finally { lock2.unlock(); } } } finally { lock1.unlock(); } } } } else { while (true) { if (lock2.tryLock()) { try { try { Thread.sleep(500); } catch (InterruptedException e) { } if (lock1.tryLock()) { try { System.out.println(Thread.currentThread() .getId() + ":My Job done"); return; } finally { lock1.unlock(); } } } finally { lock2.unlock(); } } } } } public static void main(String[] args) throws InterruptedException { TryLock r1 = new TryLock(1); TryLock r2 = new TryLock(2); Thread t1 = new Thread(r1); Thread t2 = new Thread(r2); t1.start(); t2.start(); } } // 15:My Job done // 14:My Job done
使用trylock能夠有效地避免產生死鎖框架
synchronized鎖爲非公平鎖,而reentrantLock既能夠是公平鎖也能夠是非公平鎖
非公平鎖容易產生飢餓,公平鎖先進先出,但效率不敵非公平鎖dom
public ReentrantLock(boolean fair)
Condition和object.wait(),object.notify()方法相似
condition的基本方法以下:ide
void await() throws InterruptedException; //使當前線程等待,釋放鎖,能響應signal和signalAll方法,響應中斷 void awaitUninterruptibly(); //相似 await,但不響應中斷 long awaitNanos(long nanosTimeout)throws InterruptedException; //等待一段時間 boolean await (long time,TimeUnit unit)throws InterruptedException; boolean awaitUntil(Date deadline)throws InterruptedException; void signal(); //喚醒一個等待中的線程 void signalAll(); //喚醒全部等待中的線程
JDK內部就有不少對於ReentrantLock的使用,如ArrayBlockingQueue函數
//在 ArrayBlockingQueue中的一些定義 boolean fair = true; private final ReentrantLock lock = new ReentrantLock(fair); private final Condition notEmpty = lock.newCondition(); private final Condition notFull = lock.newCondition(); //put(方法的實現 public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //put方法作同步 try { try { while (count == items.length) //隊列已滿 notFull.await(); //等待隊列有足夠的空間 } catch (InterruptedException ie) { notFull.signal(); throw ie; } insert(e); //notFull被通知時,說明有足夠的空間 } finally { lock.unlock(); } } private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notFull.signal(); //通知take方法的線程,隊列已有數據 } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //對take()方法作同步 try { try { while (count == 0) //若是隊列爲空 notEmpty.await(); //則消費者隊列要等待一個非空的信號 } catch (InterruptedException ie) { notEmpty.signal(); throw ie; } E x = extract(); return x; } finally { lock.unlock(); } } private E extract() { final E[] items = this.items; E x = items[takeIndex]; items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); //通知put線程隊列已有空閒空間 return x; }
同步鎖只能容許一個線程進行訪問,信號量能夠指定多個線程同時訪問同一個資源.工具
//構造方法 public Semaphore(int permits) //傳入int表示能同時訪問的線程數 public Semaphore(int permits, boolean fair) //線程數,是否公平鎖 //實例方法 public void acquire() throws InterruptedException //獲取一個訪問權限,會阻塞線程,會被打斷 public void acquireUninterruptibly() //獲取一個訪問權限,會阻塞線程,不會被打斷 public boolean tryAcquire() //獲取一個訪問權限,當即返回 public boolean tryAcquire(long timeout, TimeUnit unit) //獲取一個訪問權限,嘗試一段時間 public void release() //釋放一個訪問權限
public class SemapDemo implements Runnable { final Semaphore semp = new Semaphore(5); @Override public void run() { try { semp.acquire(); Thread.sleep(2000); System.out.println(Thread.currentThread().getId() + ":done!"); } catch (InterruptedException e) { e.printStackTrace(); } finally { semp.release(); //使用完後要釋放,不然會引發信號量泄漏 } } public static void main(String[] args) { ExecutorService exec = Executors.newFixedThreadPool(20); final SemapDemo demo = new SemapDemo(); for (int i = 0; i < 20; i++) { exec.submit(demo); } } } //輸出結果 //每次輸出5個結果,對應信號量的5個許可
讀寫鎖適用於讀多寫少的場景,讀讀之間爲並行,讀寫之間爲串行,寫寫之間也爲串行
public class ReadWriteLockDemo { private static Lock lock=new ReentrantLock(); private static ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock(); //獲取讀寫鎖 private static Lock readLock = readWriteLock.readLock(); //讀鎖 private static Lock writeLock = readWriteLock.writeLock(); //寫鎖 private int value; public Object handleRead(Lock lock) throws InterruptedException{ try{ lock.lock(); //模擬讀操做 Thread.sleep(1000); //讀操做的耗時越多,讀寫鎖的優點就越明顯 return value; }finally{ lock.unlock(); } } public void handleWrite(Lock lock,int index) throws InterruptedException{ try{ lock.lock(); //模擬寫操做 Thread.sleep(1000); value=index; }finally{ lock.unlock(); } } public static void main(String[] args) { final ReadWriteLockDemo demo=new ReadWriteLockDemo(); Runnable readRunnale=new Runnable() { @Override public void run() { try { demo.handleRead(readLock); // demo.handleRead(lock); } catch (InterruptedException e) { e.printStackTrace(); } } }; Runnable writeRunnale=new Runnable() { @Override public void run() { try { demo.handleWrite(writeLock,new Random().nextInt()); // demo.handleWrite(lock,new Random().nextInt()); } catch (InterruptedException e) { e.printStackTrace(); } } }; for(int i=0;i<18;i++){ new Thread(readRunnale).start(); } for(int i=18;i<20;i++){ new Thread(writeRunnale).start(); } } } //結果: //讀寫鎖明顯要比單純的鎖要更快結束,說明讀寫鎖確實提高很多效率
讓一個線程等待,知道倒計時結束
public class CountDownLatchDemo implements Runnable { static final CountDownLatch end = new CountDownLatch(10); //構造倒計時器,倒計數爲10 static final CountDownLatchDemo demo=new CountDownLatchDemo(); @Override public void run() { try { //模擬檢查任務 Thread.sleep(new Random().nextInt(10)*1000); System.out.println("check complete"); end.countDown(); //倒計時器減1 } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { ExecutorService exec = Executors.newFixedThreadPool(10); for(int i=0;i<10;i++){ exec.submit(demo); } //等待檢查 end.await(); //主線程阻塞,待其餘線程所有完成後再喚醒主線程 //發射火箭 System.out.println("Fire!"); exec.shutdown(); } }
循環柵欄相似於倒計時器,可是計數器能夠反覆使用,cyclicBarrier比CountDownLatch稍微強大些,能夠傳入一個barrierAction,barrierAction指每次完成計數便出發一次
public CyclicBarrier(int parties,Runnable barrierAction) //構造方法
public class CyclicBarrierDemo { public static class Soldier implements Runnable { private String soldier; private final CyclicBarrier cyclic; Soldier(CyclicBarrier cyclic, String soldierName) { this.cyclic = cyclic; this.soldier = soldierName; } public void run() { try { //等待全部士兵到齊 cyclic.await(); //觸發一次循環柵欄,達到計數器後纔會進行下一步工做 doWork(); //等待全部士兵完成工做 cyclic.await(); //再次觸發循環柵欄,達到計數器後纔會進行下一步工做 } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } void doWork() { try { Thread.sleep(Math.abs(new Random().nextInt()%10000)); //模擬工做 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(soldier + ":任務完成"); } } public static class BarrierRun implements Runnable { //用於傳入CyclicBarrier的構造方法,做爲達到計數器數值後的觸發任務, 能夠被屢次調用 boolean flag; int N; public BarrierRun(boolean flag, int N) { this.flag = flag; this.N = N; } public void run() { if (flag) { System.out.println("司令:[士兵" + N + "個,任務完成!]"); } else { System.out.println("司令:[士兵" + N + "個,集合完畢!]"); flag = true; } } } public static void main(String args[]) throws InterruptedException { final int N = 10; Thread[] allSoldier=new Thread[N]; boolean flag = false; CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N)); //設置屏障點,主要是爲了執行這個方法 System.out.println("集合隊伍!"); for (int i = 0; i < N; ++i) { System.out.println("士兵 "+i+" 報道!"); allSoldier[i]=new Thread(new Soldier(cyclic, "士兵 " + i)); allSoldier[i].start(); } } }
注意: 一旦其中一個被interrupt後,極可能會拋出一個interruptExpection和9個BrokenBarrierException,表示該循環柵欄已破損,防止其餘線程進行無所謂的長久等待
LockSupport是一個很是實用的線程阻塞工具,不須要獲取某個對象的鎖(如wait),也不會拋出interruptedException異常
public static void park() //掛起當前線程, public static void park(Object blocker) //掛起當前線程,顯示阻塞對象,parking to wait for <地址值>
public class LockSupportDemo { public static Object u = new Object(); static ChangeObjectThread t1 = new ChangeObjectThread("t1"); static ChangeObjectThread t2 = new ChangeObjectThread("t2"); public static class ChangeObjectThread extends Thread { public ChangeObjectThread(String name){ super.setName(name); } @Override public void run() { synchronized (u) { System.out.println("in "+getName()); LockSupport.park(this); } } } public static void main(String[] args) throws InterruptedException { t1.start(); Thread.sleep(100); t2.start(); LockSupport.unpark(t1); LockSupport.unpark(t2); //即便unpark發生在park前,也可使程序正常結束 t1.join(); t2.join(); } }
LockSupport使用了相似信號量的機制,它爲每一個線程準備一個許可,若是許可可用,park當即返回,而且消費這個許可(轉爲不可用),若是許可不可用,就會阻塞,而unpark方法就是使一個許可變爲可用 locksupport.park()能夠相應中斷,可是不會拋出interruptedException,咱們能夠用Thread.interrupted等方法中獲取中斷標記.
public class LockSupportIntDemo { public static Object u = new Object(); static ChangeObjectThread t1 = new ChangeObjectThread("t1"); static ChangeObjectThread t2 = new ChangeObjectThread("t2"); public static class ChangeObjectThread extends Thread { public ChangeObjectThread(String name){ super.setName(name); } @Override public void run() { synchronized (u) { System.out.println("in "+getName()); LockSupport.park(); if(Thread.interrupted()){ //檢測到中斷位,並清除中斷狀態 System.out.println(getName()+" 被中斷了"); } if (Thread.currentThread().isInterrupted()){ //中斷狀態已被清除,沒法檢測到 System.out.println(1); } } System.out.println(getName()+"執行結束"); } } public static void main(String[] args) throws InterruptedException { t1.start(); Thread.sleep(100); t2.start(); t1.interrupt(); LockSupport.unpark(t2); } } //輸出: //in t1 //t1 被中斷了 //t1執行結束 //in t2 //t2執行結束
限流算法通常有兩種:漏桶算法和令牌桶算法 漏桶算法: 利用緩存區,全部請求進入系統,都在緩存區中保存,而後以固定的流速流出緩存區進行處理. 令牌桶算法: 桶中存放令牌,每一個請求拿到令牌後才能進行處理,若是沒有令牌,請求要麼等待,要麼丟棄.RateLimiter就是採用這種算法
public class RateLimiterDemo { static RateLimiter limiter = RateLimiter.create(2); //每秒處理2個請求 public static class Task implements Runnable { @Override public void run() { System.out.println(System.currentTimeMillis()); } } public static void main(String args[]) throws InterruptedException { for (int i = 0; i < 50; i++) { limiter.acquire(); //過剩流量會等待 new Thread(new Task()).start(); } } } // 某些場景傾向於丟棄過剩流量,tryAcquire則是當即返回,不會阻塞 // for (int i = 0; i < 50; i++) { // if(!limiter.tryAcquire()) { // continue; // } // new Thread(new Task()).start(); // }
Executor框架提供了各類類型的線程池,主要有如下工廠方法:性能
//固定線程數量,當有新任務提交時,若池中有空閒線程則當即執行,若沒有空閒線程,任務會被暫存在一個任務隊列中,直到有空閒線程 public static ExecutorService newFixedThreadPool(int nThreads) //返回只有一個線程的線程池,多餘任務被保存到一個任務隊列中,線程空閒時,按先入先出的順序執行隊列中的任務 public static ExecutorService newSingleThreadPoolExecutor() //線程數量不固定,優先使用空閒線程,多餘任務會建立新線程 public static ExecutorService newCachedThreadPool() //線程數量爲1,給定時間執行某任務,或週期性執行任務 public static ScheduledExecutorService newSingleThreadScheduledExecutor() //線程數量能夠指定,定時或週期性執行任務 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
計劃任務:newScheduledThreadPool主要方法
//給定時間,對任務進行一次調度 public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit); //週期調度,以任務完成後間隔固定時間調度下一個任務,(二者相加) public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay, long delay, TimeUnit unit); //週期調度,兩個任務開始的時間差爲固定間隔,若是任務時間大於間隔時間則以任務時間爲準(二者取其大者) public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay, long period, TimeUnit unit);
注意: 任務異常時後續全部任務都將中止調度,所以必須保證全部任務異常均被正常處理.
ThreadPoolExecutor構造函數:
public ThreadPoolExecutor(int corePoolSize, //核心線程池大小 int maximumPoolSize, //最大線程池大小 long keepAliveTime, //線程池中超過corePoolSize數目的空閒線程最大存活時間;能夠allowCoreThreadTimeOut(true)使得核心線程有效時間 TimeUnit unit, //keepAliveTime時間單位 BlockingQueue<Runnable> workQueue, //阻塞任務隊列 ThreadFactory threadFactory, //新建線程工廠 RejectedExecutionHandler handler ) //當提交任務數超過maxmumPoolSize+workQueue之和時,任務會交給RejectedExecutionHandler來處理
workQueue指被提交可是未執行的任務隊列,是BlockingQueue接口的對象
1.直接提交隊列:SynchronousQueue,該隊列沒有容量,每一個插入操做對應一個刪除操做,即提交的任務老是會交給線程執行,若是沒有空閒進程,則建立新線程,數量達最大則執行拒絕策略,通常須要設置很大的maximumPoolSize
2.有界任務隊列:ArrayBlockingQueue,有新任務時,若線程池的實際線程數小於corePoolSize,優先建立新線程,若大於corePoolSize,加入到等待隊列,若隊列已滿,不大於maximumPoolSize前提下,建立新線程執行;當且僅當等待隊列滿時纔會建立新線程,不然數量一直維持在corePoolSize
3.無界任務隊列:LinkedBlockingQueue,小於corePoolSize時建立線程,達到corePoolSize則加入隊列直到資源消耗殆盡
4.優先任務隊列:PriorityBlockingQueue,特殊無界隊列,老是保證高優先級的任務先執行.
newFixedThreadPool: corePoolSize=maximumPoolSize,線程不會超過corePoolSize,使用LinkedBlockingQueue newSingleThreadPoolExecutor: newFixedThreadPool的弱化版,corePoolSize只有1 newCachedThreadPool: corePoolSize=0,maximumPoolSize爲無窮大,空閒線程60秒回收,使用SynchronousQueue隊列
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { //檢查是否小於corePoolSize if (addWorker(command, true)) //添加線程,執行任務 return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { //添加進隊列 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) //雙重校驗 reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) //提交線程池失敗 reject(command); //拒絕執行 }
**AbortPolicy**:該策略會直接拋出異常,阻止系統正常工做。 **CallerRunsPolicy**:只要線程池未關閉,該策略直接在調用者線程中,運行當前被丟棄的任務。顯然這樣作不會真的丟棄任務,可是,任務提交線程的性能極有可 能會急劇降低。 任務,並嘗試再次提交當前任務。 **DiscardOldestPolicy**:該策略將丟棄最老的一個請求,也就是即將被執行的一個 **DiscardPolicy**:該策略默默地丟棄沒法處理的任務,不予任何處理。若是容許任務丟失,我以爲這多是最好的一種方案了吧!
public static void main(String[] args) throws InterruptedException { MyTask task = new MyTask(); ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory(){ @Override public Thread newThread(Runnable r) { //自定義建立線程的方法 Thread t= new Thread(r); t.setDaemon(true); System.out.println("create "+t); return t; } } ); for (int i = 0; i < 5; i++) { es.submit(task); } Thread.sleep(2000); }
public class ExtThreadPool { public static class MyTask implements Runnable { public String name; public MyTask(String name) { this.name = name; } @Override public void run() { System.out.println("正在執行" + ":Thread ID:" + Thread.currentThread().getId() + ",Task Name=" + name); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()) { @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println("準備執行:" + ((MyTask) r).name); } @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println("執行完成:" + ((MyTask) r).name); } @Override protected void terminated() { System.out.println("線程池退出"); } }; for (int i = 0; i < 5; i++) { MyTask task = new MyTask("TASK-GEYM-" + i); es.execute(task); Thread.sleep(10); } es.shutdown(); //等待全部任務執行完畢後再關閉 } }
線程池中的異常堆棧可能不會拋出,須要咱們本身去包裝
public class TraceThreadPoolExecutor extends ThreadPoolExecutor { public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override public void execute(Runnable task) { super.execute(wrap(task, clientTrace(), Thread.currentThread() .getName())); } @Override public Future<?> submit(Runnable task) { return super.submit(wrap(task, clientTrace(), Thread.currentThread() .getName())); } private Exception clientTrace() { return new Exception("Client stack trace"); } private Runnable wrap(final Runnable task, final Exception clientStack, String clientThreadName) { return new Runnable() { @Override public void run() { try { task.run(); //外層包裹trycatch,便可打印出異常 } catch (Exception e) { clientStack.printStackTrace(); throw e; } } }; } }
相似於mapreduce,用於大數據量,fork()創造子線程,join表示等待,
public class CountTask extends RecursiveTask<Long>{ private static final int THRESHOLD = 10000; //任務分解規模 private long start; private long end; public CountTask(long start,long end){ this.start=start; this.end=end; } @Override public Long compute(){ long sum=0; boolean canCompute = (end-start)<THRESHOLD; //表示計算極限10000,超出此值須要使用forkjoin if(canCompute){ for(long i=start;i<=end;i++){ sum +=i; } }else{ //分紅100個小任務 long step=(start+end)/100; ArrayList<CountTask> subTasks=new ArrayList<CountTask>(); long pos=start; for(int i=0;i<100;i++){ long lastOne=pos+step; if(lastOne>end)lastOne=end; //最後一個任務可能小於step,故須要此步 CountTask subTask=new CountTask(pos,lastOne); //子任務 pos+=step+1; //調整下一個任務 subTasks.add(subTask); subTask.fork(); //fork子任務 } for(CountTask t:subTasks){ sum+=t.join(); //聚合任務 } } return sum; } public static void main(String[]args){ ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask task = new CountTask(0,200000000000L); ForkJoinTask<Long> result = forkJoinPool.submit(task); try{ long res = result.get(); System.out.println("sum="+res); }catch(InterruptedException e){ e.printStackTrace(); }catch(ExecutionException e){ e.printStackTrace(); } } }
注意: 若是任務的劃分層次不少,一直得不到返回,可能有兩種緣由: 1.系統內線程數量越積越多,致使性能嚴重降低 2.函數調用層次變多,致使棧溢出
1.特殊的DirectExecutor線程池
Executor executor=MoreExecutors.directExecutor(); // 僅在當前線程運行,用於抽象
2.Daemon線程池
提供將普通線程轉換爲Daemon線程.不少狀況下,咱們不但願後臺線程池阻止程序的退出
public class MoreExecutorsDemo2 { public static void main(String[] args) { ThreadPoolExecutor exceutor = (ThreadPoolExecutor)Executors.newFixedThreadPool(2); MoreExecutors.getExitingExecutorService(exceutor); exceutor.execute(() -> System.out.println("I am running in " + Thread.currentThread().getName())); } }
3.future模式擴展待續....