thread_fork/join併發框架1

一.線程併發控制  Thread、Executor、ForkJoin和Actor
  1.裸線程
      Runnable接口,調用.start()開始,沒有現成的API來結束線程,你須要本身來實現,經過相似boolean類型的標記來通信算法

     使用Runnable對象,而後建立對應的Thread對象來控制程序中這些線程的建立、執行以及線程的狀態多線程

  2.ExecutorFrameworkExecutor和ExecutorService接口:執行器框架將任務的建立和執行進行了分離,經過這個框架只須要實現Runnable接口的對象和使用Executor對象,而後將Runnable對象發送給執行器,執行器再負責這些任務鎖須要的線程,包括線程的建立管理已經線程的結束併發

  public interface Executor {
    void execute(Runnable command);
  }
  ExecutorService管理executor的生命週期,以及CompletionService會抽象掉更多細節,做爲已完成任務的隊列
  Executors.newFixedThreadPool(4)
   3.經過並行流,使用ForkJoinPool (FJP) 框架

     Fork/Join 分解/合併框架 dom

用來解決分治技術,將問題拆分紅若干小任務,在一個任務中,先檢查要解決任務的問題的大小,決定是否再拆分任務。若是不須要拆分任務了,就在當前任務中解決問題。而後根據須要返回任務的結果。異步

ForkJoinPool類看做一個特殊的Executor執行器類型,這個框架包括如下兩個操做:
 分解(Fork)操做:當須要將一個任務拆分紅更小的多個任務時,在框架中執行這些任務
 合併(Join)操做: 當一個主任務等待其建立的多個子任務的完成執行。
Fork/Join 和 ExecutorFramework 主要的區別在於 工做竊取算法(Work-StealingAlgroithms),使用Join操做讓一個主任務等待它所建立的子任務的完成,執行這個任務的線程稱之爲 工做者線程(worker Thread),工做者線程會尋找其餘任未被執行的任務,而後開始執行。 什麼是工做竊取算法?:就是指某個線程從其餘隊列裏竊取任務來執行。從而提高了性能。分佈式

爲了達到以上的效果,Fork/Join框架有如下幾點限制:
a. 任務只能使用fork() 和 join() 操做當同步機制。若是使用其餘的同步機制,工做線程就不能執行其餘的任務,固然這些任務是在同步操做裏時。好比在Fork/Join框架中將一個任務休眠,正在執行這個任務的工做者線程在休眠期內不能執行另外一個任務。
b. 任務不能執行I/O操做,好比文件數據的讀取與寫入
c. 任務不能拋出非運行時異常(Checke Exception),必須在代碼中處理這些異常ide

  4.actor模型中  Akka Actors
      actor模型中,一切都看作是一個actor。一個actor是一個計算實體,它能夠從其餘actor那裏接收消息。在應答消息時,它能夠給其餘actor發送消息,或者建立新的actor並與之交互,或者只改變本身的內部狀態。
      這是一個很是強大的概念。生命週期和消息傳遞由你的框架來管理,你只須要指定計算單元是什麼就能夠了。另外,actor模型強調避免全局狀態,這會帶來不少便利。你能夠應用監督策略,例如免費重試,更簡單的分佈式系統設計,錯誤容忍度等等。性能

二.簡單應用舉例測試

   轉自  http://blog.csdn.net/mr_zhuqiang/article/details/48300229

   Fork/Join框架的核心是由ForkJoinPool和ForkJoinTask組成

  ForkJoinPool : 這個類實現了 ExecutorServic接口和工做竊取算法。它管理工做者線程,並提供任務的狀態信息,以及任務的執行信息
  ForkJoinTask :是一個將在ForkJoinPool中執行的任務的基類
  Fork/Join框架提供了在一個任務裏執行fork和join操做的機制和控制任務狀態的方法,一般,爲了實現Fork/Join任務,須要實現如下兩個類之一的子類。

    RecursiveAction :用於任務沒有返回結果的場景,一個ForkJoinTask任務類,遞歸無結果的任務類。相似callable同樣的線程任務, 

    RecursiveTask :  用於任務有返回結果的場景

  下示例描述了,批量修改不少商品的價格,使用Fork/Join線程池 和 RecursiveAction(ForkJoinTask)來實現 遞歸的分配任務執行   

    public static void main(String[] args) throws InterruptedException {
        // 生成商品數據
        List<Product> list = new ArrayList<Product>();
        for (int i = 0; i < 40; i++) {
            Product p = new Product("蘋果" + i, 10);
            list.add(p);
        }
        ///////////////////////////////////////////////
        ForkJoinPool fjp = new ForkJoinPool();

        Task task = new Task(list, 0, list.size(), 19);
        fjp.execute(task);
        // fjp.shutdown(); //關閉線程池
        // fjp.awaitTermination(1, TimeUnit.MINUTES);
        //等待超時。結合shutdown來讓任務一完成就繼續執行下面的代碼

        // 使用循環的方式來查看任務的信息
        do {
            System.out.printf("活躍線程:%s,這一個參數  %s,並行執行的最大數量:%s\n", 
                    fjp.getActiveThreadCount(), 
                    fjp.getStealCount(),
                    fjp.getParallelism());

        } while (!task.isDone()); // 若是任務還未完成,則繼續獲取信息
        // 若是這個任務完成沒有拋出異常並無取消。
        if (task.isCompletedNormally()) { 
            System.out.println("main:任務完成");
        }
        
        System.out.println("main:------------------------------  打印任務結果");
        for (Product product : list) {
            int price = product.getPrice();
            String name = product.getName();
            if (price != 19) { // 結果不是所指望的。就打印出來
                System.out.println(name + "," + price);
            }
        }
        System.out.println("main:------------------------------  打印任務結束");
    }
}

class Product {
    private String name;
    private int price;

    public Product(String name, int price) {
        this.name = name;
        this.price = price;
    }

    public String getName() {
        return name;
    }
    ......
}

class Task extends RecursiveAction {
    private List<Product> list; // 全部任務
    private int start;          // 處理任務的開始索引
    private int end;            // 處理任務的結束索引
    private int price;          // 更改的價格

    public Task(List<Product> list, int start, int end, int price) {
        this.list = list;
        this.start = start;
        this.end = end;
        this.price = price;
    }

    @Override
    protected void compute() {
        if (end - start <= 10) { // 每一個task 只能處理10條數據。
            System.out.printf("起始:start:%s,end:%s\n", start, end);
            update();
        } else { // 多餘的數據,則須要分給更多的任務
            int middle = (end + start) / 2; // 由於是索引。因此須要開始和結尾相加,而後除以2 就能獲得
                                            // 兩個索引之間的數值
            Task task1 = new Task(list, start, middle, 19);
            Task task2 = new Task(list, middle, end, 19);
            
            System.out.printf("分析:middle:%s,start:%s,end:%s\n", middle, start, end); // 方便推算
            // 這裏把任務分紅了2半遞歸執行
 invokeAll(task1, task2); 
        }
    }

    // 根據給定的起始索引和結束索引更新結果
    private void update() {
        for (int i = start; i < end; i++) {
            Product product = list.get(i);
            product.setPrice(price);
 
            System.out.printf("%s,修改了價格,索引:%s,%s,%s\n",
             Thread.currentThread().getName(),
              i,product.getName(),product.getPrice() );
        }
    }
}

結果分析:
   起始信息:咱們有40個商品,每一個任務處理10個商品。恰好4個工做線程處理。
  分析信息:去中間索引,這個分析在商品數量不能被2整除的時候頗有用,在不能被2整除的狀況下,該示例任然會盡量的均衡分配任務的數量
工做原理
  invokeAll方法來執行一個主任務鎖建立的多個子任務,這個是一個同步的調用,主任務將等待子任務的完成,而後繼續執行(有多是結束),當這個主任務等待它的子任務時,執行這個主任務的工做者線程接收另外一個等待執行的任務並開始執行(並行),正由於有了這個行爲,因此說Fork/Join框架提供了一種比Runnable和Callable對象更加高效的任務管理機制。
  ForkJoinTask類的invokeAll方法是執行器框架ExecutorFramework和Fork/Join框架之間的主要差別之一。在執行器框架中。
  在執行器框架中:全部的任務必須發送給執行器
  在Fork/Join框架:線程池中包含了待執行方法的任務,任務的控制也是在線程池中進行的,咱們在task類中使用了invokeAll方法,task類繼承了RecursiveAction,而RecursiveAction類則繼承了ForkJoinTask.

.經常使用方法

1.fork join get

   fork()方法容許ForkJoinTask任務異步執行,也容許一個新的ForkJoinTask從存在的ForkJoinTask中被啓動。
   join()方法容許一個ForkJoinTask等待另外一個ForkJoinTask執行完成。

   fork()只會讓ForkJoinPool調度一個新的任務,而不會建立子虛擬機。

   RecursiveTask.join() : 也是用來獲取任務的合併結果

   RecursiveTask.get(long timeout,TimeUnit unit) : 該方法,是給定一個指定的超時時間,若是超時尚未返回結果則返回null 

   invokeAll(task1,task2): 是一個同步的方法,任務會被掛起,等待子任務發送到線程池中而且直到完成


2.RecursiveAction 和RecursiveTask

   RecursiveAction的實例表明執行沒有返回結果。
   RecursiveTask會有返回值。下面例子 返回值

public class ForkJoin2Test {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        //生成隨機矩陣
        final int rows = 10000; // 矩陣行數
        final int cols = 10000; // 矩陣列數
        final int number = 5;   // 查找的數字
        long start = System.currentTimeMillis();
        MatrixMock mock = new MatrixMock(rows, cols, number); // 生成矩陣對象
        long end = System.currentTimeMillis();
        System.out.println("建立矩陣花費時間:" + (end - start));

        //執行任務
        ForkJoinPool pool = new ForkJoinPool();
        Task2 task = new Task2(mock, 0, rows, 5);

        start = System.currentTimeMillis();
        pool.execute(task);
        pool.shutdown();
        //
        pool.awaitTermination(1, TimeUnit.MILLISECONDS);
        System.out.println("線程搜索的結果是:" + task.get());

        end = System.currentTimeMillis();
        System.out.println("線程搜索時間是:" + (end - start));

        start = System.currentTimeMillis();
        int temp = 0;
        for (int i = 0; i < rows; i++) {
            int[] rs = mock.getRow(i);
            for (int row : rs) {
                if (5 == row) {
                    temp++;
                }
            }
        }
        end = System.currentTimeMillis();
        System.out.println("單線程搜索結果是:" + temp);
        System.out.println("單線程搜索時間是:" + (end - start));
    }
}

// 任務類。查找數字出現的次數
class Task2 extends RecursiveTask<Integer> {
    private static final long serialVersionUID = 1L;
    private MatrixMock mock;
    private int start; // 查詢起始行索引
    private int end; // 查詢結束行索引
    private int num; // 要查找的數字

    public Task2(MatrixMock mock, int start, int end, int num) {
        this.mock = mock;
        this.start = start;
        this.end = end;
        this.num = num;
    }

    @Override
    protected Integer compute() {
        int result = 0;

        if (end - start < 100) { // 每一個任務最多負責5行數據
            result = this.search();
            // 適合矩陣小的時候 查看對比結果
            // System.out.printf("%s,搜索起始行是:%s-%s,搜索結果是:%s\n",Thread.currentThread().getName(),start,end,result);
        } else { // 不然則拆分紅兩個子任務
            int mid = (end + start) / 2;
            Task2 task1 = new Task2(mock, start, mid, num);
            Task2 task2 = new Task2(mock, mid, end, num);
            invokeAll(task1, task2);
            try {
                result = task1.get() + task2.get(); // 兩個結果相加,要想到 該框架的特性就是 遞歸
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        return result;
    }

    // 計算當前任務分配的行數
    private int search() {
        int result = 0;
        for (int i = start; i < end; i++) {
            int[] rows = mock.getRow(i);
            for (int row : rows) {
                if (num == row) {
                    result++;
                }
            }
        }
        return result;
    }
}

// 隨機矩陣
class MatrixMock {
    private int[][] data;

    public MatrixMock(int size, int cols, int number) {
        data = new int[size][cols];
        Random random = new Random();

        int counter = 0;
        // 用隨機數爲矩陣賦值。每生成一個字,就用它跟要查找的數字比較,進行比較。若是一致,就用計數器加1
        for (int i = 0; i < size; i++) {
            for (int j = 0; j < cols; j++) {
                data[i][j] = random.nextInt(10);
                if (data[i][j] == number) {
                    counter++;
                }
            }
        }
        // 用來驗證多線程查找的正確性
        System.out.printf("在矩陣中找到了數字:%d,%d次\n", number, counter);
        // 測試的時候,能夠放開此代碼,能打印出 矩陣分佈圖。固然須要矩陣10 * 10 比較小的收,控制檯才能裝得下
        // for (int i = 0; i < data.length; i++) {
        // for (int j = 0; j < data[i].length; j++) {
        // System.out.printf(data[i][j] + " | ");
        // }
        // System.out.println("");
        // }
    }

    public int[] getRow(int row) {
        if (row >= 0 && row < data.length) {
            return data[row];
        }
        return null;
    }
}
相關文章
相關標籤/搜索