/** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see java.lang.Thread#run() */ public abstract void run(); /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception;
package thread.blogs.threadmodel; /** * Created by PerkinsZhu on 2017/9/1 15:34. */ public class AbstractModel { protected static void sleep(int time) { try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } protected static void println(Object info) { System.out.println(info); } }
package thread.blogs.threadmodel; import java.util.concurrent.*; /** * Created by PerkinsZhu on 2017/9/1 15:32. */ public class FutureModel extends AbstractModel { public static void main(String[] args) { testFuture(); } /** * 區別: CallAble 能夠有返回值 能夠拋出受檢異常 * Runnable 沒有返回值 沒法拋出受檢異常但可捕獲線程中發生的異常。 * 者均可經過對future.get()進行try cathch捕獲異常 */ private static void testFuture() { MyCallable myCallable = new MyCallable(); MyRunnable myRunnable = new MyRunnable(); ExecutorService executorService = Executors.newFixedThreadPool(5); Future<?> future = executorService.submit(myCallable); sleep(2000); try { //String data = future.get(2000, TimeUnit.MILLISECONDS);//能夠指定超時時間 Object data = future.get();//當執行Runnable的時候,這裏返回的爲nul。此時若是有run方法體中有異常異常拋出,能夠在此捕獲到,雖然Run方法沒有顯示的拋出受檢異常。 println(data + "---" + data.getClass().toString()); } catch (InterruptedException e) { println(e.getMessage()); } catch (ExecutionException e) { println(e.getMessage()); } catch (Exception e) { println(e.getMessage()); } executorService.shutdown(); } static class MyCallable implements Callable<String> { @Override public String call() throws Exception { sleep(500); println("I am Callable..."); //int num = 10/0; //throw new RuntimeException("異常"); return "hello"; } } static class MyRunnable implements Runnable { @Override public void run() {//不支持返回值,沒法對線程捕獲異常。 sleep(500); println("I am Runnable..."); // int num = 10/0; //throw new RuntimeException("異常"); } } }
能夠取消註釋 分別測試 myCallable 和myRunnable 對異常捕獲和結果獲取進行測試。post
二、fork&join 模型
該模型是jdk中提供的線程模型。該模型包含遞歸思想和回溯思想,遞歸用來拆分任務,回溯用合併結果。 能夠用來處理一些能夠進行拆分的大任務。其主要是把一個大任務逐級拆分爲多個子任務,而後分別在子線程中執行,當每一個子線程執行結束以後逐級回溯,返回結果進行彙總合併,最終得出想要的結果。這裏模擬一個摘蘋果的場景:有100棵蘋果樹,每棵蘋果樹有10個蘋果,如今要把他們摘下來。爲了節約時間,規定每一個線程最多隻能摘10棵蘋樹以便於節約時間。各個線程摘完以後彙總計算總蘋果樹。代碼實現以下:測試
package thread.blogs.threadmodel; import scala.Console; import java.util.concurrent.*; import java.util.concurrent.locks.ReentrantLock; /** * Created by PerkinsZhu on 2017/9/5 13:05. */ public class ForkJoin { public static void main(String[] args) { testAcation(); } private static void testAcation() { ForkJoinPool pool = new ForkJoinPool(); ForkJoinTask<Integer> future = pool.submit(new ResultTask(100));//共100棵蘋果樹 try { Console.println(future.get()); pool.awaitTermination(1000, TimeUnit.MILLISECONDS); } catch (Exception e) { e.printStackTrace(); } pool.shutdown(); } } class ResultTask extends RecursiveTask<Integer> { //也可繼承自RecursiveAction抽象類,區別在於compute方法沒有返回值,若是隻須要執行動做則可使用該接口 private int treeNum; public ResultTask(int num) { this.treeNum = num; } @Override protected Integer compute() { if (treeNum < 10) {//每一個線程最多隻能摘10棵蘋果樹 return getAppleNum(treeNum); } else { //對任務進行拆分,注意這裏不只僅能夠一分爲二進行拆分,也能夠拆爲多個子任務 int temp = treeNum / 2; ResultTask left = new ResultTask(temp); ResultTask right = new ResultTask(treeNum - temp); left.fork(); right.fork(); //對子任務處理的結果進行合併 int result = left.join() + right.join(); Console.println("========" + Thread.currentThread().getName() + "=========" + result); return result; } } public Integer getAppleNum(int treeNum) { return treeNum * 10;//每棵樹上10個蘋果 } }
========ForkJoinPool-1-worker-3=========120 ========ForkJoinPool-1-worker-7=========120 ========ForkJoinPool-1-worker-0=========120 ========ForkJoinPool-1-worker-5=========120 ========ForkJoinPool-1-worker-1=========130 ========ForkJoinPool-1-worker-11=========130 ========ForkJoinPool-1-worker-4=========250 ========ForkJoinPool-1-worker-7=========130 ========ForkJoinPool-1-worker-7=========250 ========ForkJoinPool-1-worker-3=========130 ========ForkJoinPool-1-worker-5=========250 ========ForkJoinPool-1-worker-6=========250 ========ForkJoinPool-1-worker-2=========500 ========ForkJoinPool-1-worker-3=========500 ========ForkJoinPool-1-worker-1=========1000 1000
actor模型屬於一種基於消息傳遞機制並行任務處理思想,它以消息的形式來進行線程間數據傳輸,避免了全局變量的使用,進而避免了數據同步錯誤的隱患。actor在接受到消息以後能夠本身進行處理,也能夠繼續傳遞(分發)給其它actor進行處理。在使用actor模型的時候須要使用第三方Akka提供的框架點擊查看。這裏使用scala進行演示,若是須要看java使用方法則能夠查閱官方文檔:actor for java 使用。
package thread.blogs.threadmodel import akka.actor.{Actor, ActorSystem, Props} /** * Created by PerkinsZhu on 2017/9/21 18:58. */ object ActorTest { def main(args: Array[String]): Unit = { val actorSystem = ActorSystem("MyActor") val actor = actorSystem.actorOf(Props[MyActor], "MyActor") actor ! "很高興認識你!"//發送消息給actor } } class MyActor extends Actor { override def receive: Receive = {//接收消息,根據消息類型進行case匹配,能夠在此actor進行處理,也能夠繼續傳遞給其它actor進行處理(參考master-worker)。 case str: String => println(str) } }
package thread.blogs.threadmodel; import java.util.LinkedList; import java.util.Queue; import java.util.UUID; /** * Created by PerkinsZhu on 2017/9/22 8:58. */ public class PCModel { public static void main(String[] args) { testPCModel(); } private static Queue<String> queue = new LinkedList<String>();//任務緩存,這裏保存簡單的字符串模擬任務 private static void testPCModel() { new Thread(() -> {//生產者線程 while (true) { String uuid = UUID.randomUUID().toString(); queue.add(uuid); sleep(100); } }).start(); for (int i = 0; i < 10; i++) {//開啓10消費者處理任務,保證生產者產生的任務可以被及時處理 new Thread(() -> { while (true) { doWork(queue); } }).start(); } } private static void doWork(Queue<String> queue) { sleep(1000); synchronized (queue) { if (queue.size() > 0) { sleep(10); System.out.println(queue.poll() + "----" + queue.size()); } } } private static void sleep(int time) { try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } }
master-worker模型相似於任務分發策略,開啓一個master線程接收任務,而後在master中根據任務的具體狀況進行分發給其它worker子線程,而後由子線程處理任務。如需返回結果,則worker處理結束以後把處理結果返回給master。下面的代碼示例是使用akka actor for scala演示。使用的時候也可使用java Thread來實現該模型。
package thread.blogs.threadmodel import akka.actor.{Actor, ActorSystem, Props} /** * Created by PerkinsZhu on 2017/9/21 18:58. */ object ActorTest { val actorSystem = ActorSystem("Master") def main(args: Array[String]): Unit = { val actor = actorSystem.actorOf(Props[Master], "Master") var taskNum = 0; while (true) { taskNum = taskNum + 1; actor ! Task("作做業! --" + taskNum) //發送消息給actor Thread.sleep(100) } } } class Master extends Actor { val actorSystem = ActorSystem("worker") var num = 0; override def receive: Receive = { case task: Task => { num = num + 1; //接收到任務以後分發給其它worker線程。可使用worker池 複用actor actorSystem.actorOf(Props[Worker], "worker" + num) ! task } case any: Any => println(any) } } class Worker extends Actor { def doWork(task: Task): Unit = println(task.name) override def receive: Receive = { case task: Task => doWork(task) //worker處理接受到的任務 case any: Any => println(any) } } case class Task(name: String)
這裏若是須要worker返回處理結果,則只須要在worker中調用sender 發送處理結果便可。