規模爲N的問題,N<閾值,直接解決,N>閾值,將N分解爲K個小規模子問題,子問題互相對立,與原問題形式相同,將子問題的解合併獲得原問題的解java
動態規範sql
就是在任務分割的時候,前面的任務執行可能會比後面的執行速度快,當前面的執行完,後面的還沒執行的時候,執行完前面的任務的線程不會中止,而是從後面的任務的尾部取出子任務繼續工做。Fork-Join就是實現了這樣的機制。數據庫
Fork/Join使用的標準範式數組
代碼以下:併發
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; /** * @Auther: BlackKingW * @Date: 2019/4/14 12:09 * @Description: */ public class SumArray { private static class SumTask extends RecursiveTask<Integer>{ private final static int THRESHOLD = MakeArray.ARRAY_LENGTH/10; private int[] src; //表示咱們要實際統計的數組 private int fromIndex;//開始統計的下標 private int toIndex;//統計到哪裏結束的下標 public SumTask(int[] src, int fromIndex, int toIndex) { this.src = src; this.fromIndex = fromIndex; this.toIndex = toIndex; } @Override protected Integer compute() { if(toIndex-fromIndex < THRESHOLD) { int count = 0; for(int i=fromIndex;i<=toIndex;i++) { //SleepTools.ms(1); count = count + src[i]; } return count; }else { //fromIndex....mid....toIndex //1...................70....100 int mid = (fromIndex+toIndex)/2; SumTask left = new SumTask(src,fromIndex,mid); SumTask right = new SumTask(src,mid+1,toIndex); invokeAll(left,right); return left.join()+right.join(); } } } public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(); int[] src = MakeArray.makeArray(); SumTask innerFind = new SumTask(src,0,src.length-1); long start = System.currentTimeMillis(); pool.invoke(innerFind);//同步調用 System.out.println("Task is Running....."); System.out.println("The count is "+innerFind.join() +" spend time:"+(System.currentTimeMillis()-start)+"ms"); } } public class MakeArray { //數組長度 public static final int ARRAY_LENGTH = 100000000; public static int[] makeArray() { //new一個隨機數發生器 Random r = new Random(); int[] result = new int[ARRAY_LENGTH]; for(int i=0;i<ARRAY_LENGTH;i++){ //用隨機數填充數組 result[i] = r.nextInt(ARRAY_LENGTH*3); } return result; } }
做用:是一組線程等待其餘的線程完成工做之後在執行,增強版joinapp
await用來等待,countDown負責計數器的減一框架
代碼示例:less
import java.util.concurrent.CountDownLatch; /** * @Auther: BlackKingW * @Date: 2019/4/14 12:09 * @Description: */ public class UseCountDownLatch { static CountDownLatch latch = new CountDownLatch(6); //初始化線程(只有一步,有4個) private static class InitThread implements Runnable{ @Override public void run() { System.out.println("Thread_"+Thread.currentThread().getId() +" ready init work......"); latch.countDown();//初始化線程完成工做了,countDown方法只扣減一次; for(int i =0;i<2;i++) { System.out.println("Thread_"+Thread.currentThread().getId() +" ........continue do its work"); } } } //業務線程 private static class BusiThread implements Runnable{ @Override public void run() { try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } for(int i =0;i<3;i++) { System.out.println("BusiThread_"+Thread.currentThread().getId() +" do business-----"); } } } public static void main(String[] args) throws InterruptedException { //單獨的初始化線程,初始化分爲2步,須要扣減兩次 new Thread(new Runnable() { @Override public void run() { SleepTools.ms(1); //休眠1s System.out.println("Thread_"+Thread.currentThread().getId() +" ready init work step 1st......"); latch.countDown();//每完成一步初始化工做,扣減一次 System.out.println("begin step 2nd......."); SleepTools.ms(1); System.out.println("Thread_"+Thread.currentThread().getId() +" ready init work step 2nd......"); latch.countDown();//每完成一步初始化工做,扣減一次 } }).start(); new Thread(new BusiThread()).start(); for(int i=0;i<=3;i++){ Thread thread = new Thread(new InitThread()); thread.start(); } latch.await(); System.out.println("Main do ites work........"); } }
讓一組線程達到某個屏障,被阻塞,一直到組內最後一個線程達到屏障時,屏障開放,全部被阻塞的線程會繼續運行CyclicBarrier(int parties)dom
CyclicBarrier(int parties, Runnable barrierAction),屏障開放,barrierAction定義的任務會執行ide
代碼示例:
import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CyclicBarrier; /** * @Auther: BlackKingW * @Date: 2019/4/14 12:09 * @Description: */ public class UseCyclicBarrier { private static CyclicBarrier barrier = new CyclicBarrier(5,new CollectThread()); private static ConcurrentHashMap<String,Long> resultMap = new ConcurrentHashMap<>();//存放子線程工做結果的容器 public static void main(String[] args) { for(int i=0;i<=4;i++){ Thread thread = new Thread(new SubThread()); thread.start(); } } //負責屏障開放之後的工做 private static class CollectThread implements Runnable{ @Override public void run() { StringBuilder result = new StringBuilder(); for(Map.Entry<String,Long> workResult:resultMap.entrySet()){ result.append("["+workResult.getValue()+"]"); } System.out.println(" the result = "+ result); System.out.println("do other business........"); } } //工做線程 private static class SubThread implements Runnable{ @Override public void run() { long id = Thread.currentThread().getId();//線程自己的處理結果 resultMap.put(Thread.currentThread().getId()+"",id); Random r = new Random();//隨機決定工做線程的是否睡眠 try { if(r.nextBoolean()) { Thread.sleep(2000+id); System.out.println("Thread_"+id+" ....do something "); } System.out.println(id+"....is await"); barrier.await(); Thread.sleep(1000+id); System.out.println("Thread_"+id+" ....do its business "); } catch (Exception e) { e.printStackTrace(); } } } }
CountDownLatch和CyclicBarrier辨析
一、countdownlatch放行由第三者控制,CyclicBarrier放行由一組線程自己控制
二、countdownlatch放行條件》=線程數,CyclicBarrier放行條件=線程數
控制同時訪問某個特定資源的線程數量,用在流量控制
import java.sql.Connection; import java.util.LinkedList; import java.util.concurrent.Semaphore; /** * @Auther: BlackKingW * @Date: 2019/4/14 12:09 * @Description: */ public class DBPoolSemaphore { private final static int POOL_SIZE = 10; private final Semaphore useful,useless;//useful表示可用的數據庫鏈接,useless表示已用的數據庫鏈接 public DBPoolSemaphore() { this. useful = new Semaphore(POOL_SIZE); this.useless = new Semaphore(0); } //存放數據庫鏈接的容器 private static LinkedList<Connection> pool = new LinkedList<Connection>(); //初始化池 static { for (int i = 0; i < POOL_SIZE; i++) { pool.addLast(SqlConnectImpl.fetchConnection()); } } /*歸還鏈接*/ public void returnConnect(Connection connection) throws InterruptedException { if(connection!=null) { System.out.println("當前有"+useful.getQueueLength()+"個線程等待數據庫鏈接!!" +"可用鏈接數:"+useful.availablePermits()); useless.acquire(); synchronized (pool) { pool.addLast(connection); } useful.release(); } } /*從池子拿鏈接*/ public Connection takeConnect() throws InterruptedException { useful.acquire(); Connection conn; synchronized (pool) { conn = pool.removeFirst(); } useless.release(); return conn; } }
兩個線程間的數據交換, 用的比較少
代碼示例:
import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.Exchanger; /** * @Auther: BlackKingW * @Date: 2019/4/14 12:09 * @Description: */ public class UseExchange { private static final Exchanger<Set<String>> exchange = new Exchanger<Set<String>>(); public static void main(String[] args) { //第一個線程 new Thread(new Runnable() { @Override public void run() { Set<String> setA = new HashSet<String>();//存放數據的容器 try { /*添加數據 * set.add(.....) * */ setA = exchange.exchange(setA);//交換set /*處理交換後的數據*/ } catch (InterruptedException e) { } } }).start(); //第二個線程 new Thread(new Runnable() { @Override public void run() { Set<String> setB = new HashSet<String>();//存放數據的容器 try { /*添加數據 * set.add(.....) * set.add(.....) * */ setB = exchange.exchange(setB);//交換set /*處理交換後的數據*/ } catch (InterruptedException e) { } } }).start(); } }
類之間的關係
isDone,結束,正常仍是異常結束,或者本身取消,返回true;
isCancelled 任務完成前被取消,返回true;
cancel(boolean):
代碼示例:
import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; /** * @Auther: BlackKingW * @Date: 2019/4/14 12:09 * @Description: */ public class UseFuture { /*實現Callable接口,容許有返回值*/ private static class UseCallable implements Callable<Integer>{ private int sum; @Override public Integer call() throws Exception { System.out.println("Callable子線程開始計算"); Thread.sleep(2000); for(int i=0;i<5000;i++) { sum = sum+i; } System.out.println("Callable子線程計算完成,結果="+sum); return sum; } } public static void main(String[] args) throws InterruptedException, ExecutionException { UseCallable useCallable = new UseCallable(); FutureTask<Integer> futureTask = new FutureTask<Integer>(useCallable); new Thread(futureTask).start(); Random r = new Random(); SleepTools.second(1); if(r.nextBoolean()) {//隨機決定是得到結果仍是終止任務 System.out.println("Get UseCallable result = "+futureTask.get()); }else { System.out.println("中斷計算"); futureTask.cancel(true); } } }
場景舉例:包含圖片和文字的文檔的處理:圖片(雲上),能夠用future去取圖片,主線程繼續解析文字。