多線程(八)經常使用的線程模型

  在處理業務的時候,有時候須要根據狀況使用不一樣的線程處理模型來處理業務邏輯,這裏演示一下常見的線程模型使用技巧。html

一、Future模型java

  前面的章節中提到過Future模型,該模型一般在使用的時候須要結合Callable接口配合使用。Future:將來的、未來的,再結合Callable大概能夠明白其功能。緩存

  Future是把結果放在未來獲取,當前主線程並不急於獲取處理結果。容許子線程先進行處理一段時間,處理結束以後就把結果保存下來,當主線程須要使用的時候再向子線程索取。多線程

  Callable是相似於Runnable的接口,其中call方法相似於run方法,所不一樣的是run方法不能拋出受檢異常沒有返回值,而call方法則能夠拋出受檢異常並可設置返回值。二者的方法體都是線程執行體。框架

    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see     java.lang.Thread#run()
     */
    public abstract void run();

    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;

   注意這裏,沒法拋出受檢異常不等於沒法捕獲線程中throws的異常。run方法執行體中拋出異常是能夠被捕獲的,前提是使用Future來處理,後面會有說明。dom

       若是有一種場景須要一個線程處理一段業務,處理結束以後主線程將會使用處理結果進行後續處理。這樣,按照普通邏輯,就須要使用到一個全局變量來保存子線程處理以後的結果。子線程處理結束以後,把結果保存在全局變量中供主線程進行調用。一旦涉及到全局能量便存在着多線程讀寫全局變量錯誤的風險。而使用Future模式即可以省去全局變量的使用,直接從線程中獲取子線程處理結果。下面看一下使用示例;ide

package thread.blogs.threadmodel;

/**
 * Created by PerkinsZhu on 2017/9/1 15:34.
 */
public class AbstractModel {
    protected static void sleep(int time) {
        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    protected static void println(Object info) {
        System.out.println(info);
    }
}

 

package thread.blogs.threadmodel;

import java.util.concurrent.*;

/**
 * Created by PerkinsZhu on 2017/9/1 15:32.
 */
public class FutureModel extends AbstractModel {
    public static void main(String[] args) {
        testFuture();
    }

    /**
     * 區別: CallAble  能夠有返回值  能夠拋出受檢異常
     * Runnable  沒有返回值   沒法拋出受檢異常但可捕獲線程中發生的異常。
     * 者均可經過對future.get()進行try cathch捕獲異常
     */
    private static void testFuture() {
        MyCallable myCallable = new MyCallable();
        MyRunnable myRunnable = new MyRunnable();
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        Future<?> future = executorService.submit(myCallable);
        sleep(2000);
        try {
            //String data = future.get(2000, TimeUnit.MILLISECONDS);//能夠指定超時時間
            Object data = future.get();//當執行Runnable的時候,這裏返回的爲nul。此時若是有run方法體中有異常異常拋出,能夠在此捕獲到,雖然Run方法沒有顯示的拋出受檢異常。
            println(data + "---" + data.getClass().toString());
        } catch (InterruptedException e) {
            println(e.getMessage());
        } catch (ExecutionException e) {
            println(e.getMessage());
        } catch (Exception e) {
            println(e.getMessage());
        }
        executorService.shutdown();
    }

    static class MyCallable implements Callable<String> {
        @Override
        public String call() throws Exception {
            sleep(500);
            println("I am Callable...");
            //int num = 10/0;
            //throw  new RuntimeException("異常");
            return "hello";
        }
    }

    static class MyRunnable implements Runnable {
        @Override
        public void run() {//不支持返回值,沒法對線程捕獲異常。
            sleep(500);
            println("I am Runnable...");
            // int num = 10/0;
            //throw  new RuntimeException("異常");
        }
    }
}

 能夠取消註釋 分別測試 myCallable  和myRunnable 對異常捕獲和結果獲取進行測試。post

二、fork&join 模型
        該模型是jdk中提供的線程模型。該模型包含遞歸思想和回溯思想,遞歸用來拆分任務,回溯用合併結果。 能夠用來處理一些能夠進行拆分的大任務。其主要是把一個大任務逐級拆分爲多個子任務,而後分別在子線程中執行,當每一個子線程執行結束以後逐級回溯,返回結果進行彙總合併,最終得出想要的結果。這裏模擬一個摘蘋果的場景:有100棵蘋果樹,每棵蘋果樹有10個蘋果,如今要把他們摘下來。爲了節約時間,規定每一個線程最多隻能摘10棵蘋樹以便於節約時間。各個線程摘完以後彙總計算總蘋果樹。代碼實現以下:測試

package thread.blogs.threadmodel;

import scala.Console;

import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created by PerkinsZhu on 2017/9/5 13:05.
 */
public class ForkJoin {
    public static void main(String[] args) {
        testAcation();
    }

    private static void testAcation() {
        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinTask<Integer> future = pool.submit(new ResultTask(100));//共100棵蘋果樹
        try {
            Console.println(future.get());
            pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            e.printStackTrace();
        }
        pool.shutdown();
    }
}

class ResultTask extends RecursiveTask<Integer> { //也可繼承自RecursiveAction抽象類,區別在於compute方法沒有返回值,若是隻須要執行動做則可使用該接口
    private int treeNum;

    public ResultTask(int num) {
        this.treeNum = num;
    }

    @Override
    protected Integer compute() {
        if (treeNum < 10) {//每一個線程最多隻能摘10棵蘋果樹
            return getAppleNum(treeNum);
        } else {

            //對任務進行拆分,注意這裏不只僅能夠一分爲二進行拆分,也能夠拆爲多個子任務
            int temp = treeNum / 2;
            ResultTask left = new ResultTask(temp);
            ResultTask right = new ResultTask(treeNum - temp);
            left.fork();
            right.fork();
            //對子任務處理的結果進行合併
            int result = left.join() + right.join();

            Console.println("========" + Thread.currentThread().getName() + "=========" + result);
            return result;
        }
    }

    public Integer getAppleNum(int treeNum) {
        return treeNum * 10;//每棵樹上10個蘋果
    }
}

 這裏須要看一下執行結果,主要是爲了明白在拆分子任務的時候並非無限制開啓線程,而是使用了線程池ForkJoinPool複用線程。注意下面輸出的線程名稱!ui

========ForkJoinPool-1-worker-3=========120
========ForkJoinPool-1-worker-7=========120
========ForkJoinPool-1-worker-0=========120
========ForkJoinPool-1-worker-5=========120
========ForkJoinPool-1-worker-1=========130
========ForkJoinPool-1-worker-11=========130
========ForkJoinPool-1-worker-4=========250
========ForkJoinPool-1-worker-7=========130
========ForkJoinPool-1-worker-7=========250
========ForkJoinPool-1-worker-3=========130
========ForkJoinPool-1-worker-5=========250
========ForkJoinPool-1-worker-6=========250
========ForkJoinPool-1-worker-2=========500
========ForkJoinPool-1-worker-3=========500
========ForkJoinPool-1-worker-1=========1000
1000

  三、actor消息模型

  actor模型屬於一種基於消息傳遞機制並行任務處理思想,它以消息的形式來進行線程間數據傳輸,避免了全局變量的使用,進而避免了數據同步錯誤的隱患。actor在接受到消息以後能夠本身進行處理,也能夠繼續傳遞(分發)給其它actor進行處理。在使用actor模型的時候須要使用第三方Akka提供的框架點擊查看。這裏使用scala進行演示,若是須要看java使用方法則能夠查閱官方文檔:actor for java 使用。

package thread.blogs.threadmodel

import akka.actor.{Actor, ActorSystem, Props}

/**
  * Created by PerkinsZhu on 2017/9/21 18:58. 
  */
object ActorTest {
  def main(args: Array[String]): Unit = {
    val actorSystem = ActorSystem("MyActor")
    val actor = actorSystem.actorOf(Props[MyActor], "MyActor")
    actor ! "很高興認識你!"//發送消息給actor
  }
}

class MyActor extends Actor {
  override def receive: Receive = {//接收消息,根據消息類型進行case匹配,能夠在此actor進行處理,也能夠繼續傳遞給其它actor進行處理(參考master-worker)。
    case str: String => println(str)
  }
}

 四、生產者消費者模型

  生產者消費者模型都比較熟悉,其核心是使用一個緩存來保存任務。開啓一個/多個線程來生產任務,而後再開啓一個/多個來從緩存中取出任務進行處理。這樣的好處是任務的生成和處理分隔開,生產者不須要處理任務,只負責向生成任務而後保存到緩存。而消費者只須要從緩存中取出任務進行處理。使用的時候能夠根據任務的生成狀況和處理狀況開啓不一樣的線程來處理。好比,生成的任務速度較快,那麼就能夠靈活的多開啓幾個消費者線程進行處理,這樣就能夠避免任務的處理響應緩慢的問題。使用示例以下:

package thread.blogs.threadmodel;

import java.util.LinkedList;
import java.util.Queue;
import java.util.UUID;

/**
 * Created by PerkinsZhu on 2017/9/22 8:58.
 */
public class PCModel {
    public static void main(String[] args) {
        testPCModel();
    }
    private static Queue<String> queue = new LinkedList<String>();//任務緩存,這裏保存簡單的字符串模擬任務
    private static void testPCModel() {
        new Thread(() -> {//生產者線程
            while (true) {
                String uuid = UUID.randomUUID().toString();
                queue.add(uuid);
                sleep(100);
            }
        }).start();
        for (int i = 0; i < 10; i++) {//開啓10消費者處理任務,保證生產者產生的任務可以被及時處理
            new Thread(() -> {
                while (true) {
                    doWork(queue);
                }
            }).start();
        }
    }

    private static void doWork(Queue<String> queue) {
        sleep(1000);
        synchronized (queue) {
            if (queue.size() > 0) {
                sleep(10);
                System.out.println(queue.poll() + "----" + queue.size());
            }
        }
    }
    private static void sleep(int time) {
        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 五、master-worker模型

  master-worker模型相似於任務分發策略,開啓一個master線程接收任務,而後在master中根據任務的具體狀況進行分發給其它worker子線程,而後由子線程處理任務。如需返回結果,則worker處理結束以後把處理結果返回給master。下面的代碼示例是使用akka actor for scala演示。使用的時候也可使用java Thread來實現該模型。

package thread.blogs.threadmodel

import akka.actor.{Actor, ActorSystem, Props}

/**
  * Created by PerkinsZhu on 2017/9/21 18:58. 
  */
object ActorTest {
  val actorSystem = ActorSystem("Master")


  def main(args: Array[String]): Unit = {
    val actor = actorSystem.actorOf(Props[Master], "Master")
    var taskNum = 0;
    while (true) {
      taskNum = taskNum + 1;
      actor ! Task("作做業!  --" + taskNum) //發送消息給actor
      Thread.sleep(100)
    }
  }
}

class Master extends Actor {
  val actorSystem = ActorSystem("worker")
  var num = 0;

  override def receive: Receive = {
    case task: Task => {
      num = num + 1;
      //接收到任務以後分發給其它worker線程。可使用worker池 複用actor
      actorSystem.actorOf(Props[Worker], "worker" + num) ! task
    }
    case any: Any => println(any)
  }
}

class Worker extends Actor {

  def doWork(task: Task): Unit = println(task.name)

  override def receive: Receive = {
    case task: Task => doWork(task) //worker處理接受到的任務
    case any: Any => println(any)
  }
}

case class Task(name: String)

  這裏若是須要worker返回處理結果,則只須要在worker中調用sender 發送處理結果便可。

   

=========================================

原文連接:多線程(八)經常使用的線程模型轉載請註明出處!

=========================================

---end

相關文章
相關標籤/搜索