併發編程之Callable和Future接口、FutureTask類

    Callable接口表明一段能夠調用並返回結果的代碼;Future接口表示異步任務,是尚未完成的任務給出的將來結果。因此說Callable用於產生結果,Future用於獲取結果。java

    Java 5在concurrency包中引入了java.util.concurrent.Callable 接口,它和Runnable接口很類似,但它能夠返回一個對象或者拋出一個異常。其中Runnable能夠提交給Thread來包裝下,直接啓動一個線程來執行,而Callable則通常都是提交給ExecuteService來執行。Executor就是Runnable和Callable的調度容器,Future就是對於具體的調度任務的執行結果進行查看,最爲關鍵的是Future能夠檢查對應的任務是否已經完成,也能夠阻塞在get方法上一直等待任務返回結果。Runnable和Callable的差異就是Runnable是沒有結果能夠返回的,而且Runnable沒法拋出返回結果的異常,就算是經過Future也看不到任務調度的結果的。dom

    Callable接口使用泛型去定義它的返回類型。Executors類提供了一些有用的方法在線程池中執行Callable內的任務。因爲 Callable任務是並行的(並行就是總體看上去是並行的,其實在某個時間點只有一個線程在執行),咱們必須等待它返回的結果。異步

    java.util.concurrent.Future對象爲咱們解決了這個問題。在線程池提交Callable任務後返回了一個Future對象,使用它能夠知道Callable任務的狀態和獲得Callable返回的執行結果。Future提供了get()方法讓咱們能夠等待Callable結束並 獲取它的執行結果。ide

Callable接口的源碼以下:函數

public interface Callable<V> {
    V call() throws Exception; // 計算結果
}

Future接口的源碼以下:this

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);// 試圖取消對此任務的執行

    boolean isCancelled();// 若是在任務正常完成前將其取消,則返回 true

    boolean isDone();// 若是任務已完成,則返回 true

    V get() throws InterruptedException, ExecutionException;// 若有必要,等待計算完成,而後獲取其結果

    // 若有必要,最多等待爲使計算完成所給定的時間以後,獲取其結果(若是結果可用)。
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

Future用於表示異步計算的結果。它的實現類是FutureTask。spa

若是不想分支線程阻塞主線程,又想取得分支線程的執行結果,就用FutureTask
FutureTask實現了Runnable和Future接口,這個接口的定義以下:線程

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

能夠看到這個接口實現了Runnable和Future接口,接口中的具體實現由FutureTask來實現。這個類的兩個構造方法以下 :orm

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    sync = new Sync(callable);
}

public FutureTask(Runnable runnable, V result) {
    sync = new Sync(Executors.callable(runnable, result));
}

如上提供了兩個構造函數,一個以Callable爲參數,另一個以Runnable爲參數。這些類之間的關聯對於任務建模的辦法很是靈活,容許你基於 FutureTask的Runnable特性(由於它實現了Runnable接口),把任務寫成Callable,而後封裝進一個由執行者調度並在必要時 能夠取消的FutureTask。對象

FutureTask能夠由執行者調度,這一點很關鍵。它對外提供的方法基本上就是Future和Runnable接口的組合:get()、cancel、isDone()、isCancelled()和run(),而run()方法一般都是由執行者調用,咱們基本上不須要直接調用它。

下面來看一個FutureTask的例子,以下:

package test1;
import java.util.concurrent.*;

class MyCallable implements Callable<String> {
    private long waitTime;

    public MyCallable(int timeInMillis) {
        this.waitTime = timeInMillis;
    }

    @Override
    public String call() throws Exception {
        Thread.sleep(waitTime);
        //return the thread name executing this callable task
        return Thread.currentThread().getName();
    }
}

public class FutureTaskExample {
    public static void main(String[] args) {
        // 要執行的任務
        MyCallable callable1 = new MyCallable(1000);
        MyCallable callable2 = new MyCallable(2000);
        // Callable寫的任務封裝到一個由執行者調度的FutureTask對象
        FutureTask<String> futureTask1 = new FutureTask<String>(callable1);
        FutureTask<String> futureTask2 = new FutureTask<String>(callable2);
        // 建立線程池並返回ExecutorService實例
        ExecutorService executor = Executors.newFixedThreadPool(2);
        executor.execute(futureTask1);  // 執行任務
        executor.execute(futureTask2);

        while (true) {
            try {
                //兩個任務都完成
                if (futureTask1.isDone() && futureTask2.isDone()) {
                    System.out.println("Done");
                    // 關閉線程池和服務
                    executor.shutdown();
                    return;
                }
                //任務1沒有完成,會等待,直到任務完成
                if (!futureTask1.isDone()) {
                    System.out.println("FutureTask1 output=" + futureTask1.get());
                }

                System.out.println("Waiting for FutureTask2 to complete");
                String s = futureTask2.get(200L, TimeUnit.MILLISECONDS);
                if (s != null) {
                    System.out.println("FutureTask2 output=" + s);
                }
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                //do nothing
            }
        }
    }
}

運行如上程序後,能夠看到一段時間內沒有輸出,由於get()方法等待任務執行完成而後才輸出內容. 

輸出結果以下:

FutureTask1 output=pool-1-thread-1

Waiting for FutureTask2 to complete

Waiting for FutureTask2 to complete

Waiting for FutureTask2 to complete

Waiting for FutureTask2 to complete

Waiting for FutureTask2 to complete

FutureTask2 output=pool-1-thread-2

Done

Callable和Future接口示例程序:該程序是計算一個公司的年銷售水泥的總 數目,每一行表明一個客戶,每一列表明一個客戶在每月內的購買數量,將每個客戶(每一行)看作一個小任務。每個任務計算以後放入Future中,等 待全部的計算完畢後,調用get(是一個阻塞方法,等待全部任務執行完畢)方法獲得結果並計算總和。

package test1;

import java.text.DateFormatSymbols;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class AnnualSalesCalc {

    private static int NUMBER_OF_CUSTOMERS = 100;
    private static int NUMBER_OF_MONTHS = 12;
    private static int salesMatrix[][];

    private static class Summer implements Callable<Integer> {
        private int companyID;

        public Summer(int companyID) {
            this.companyID = companyID;
        }

        @Override
        public Integer call() {
            int sum = 0;
            for (int col = 0; col < NUMBER_OF_MONTHS; col++) {
                sum += salesMatrix[companyID][col];
            }
            System.out.printf("Totaling for client 1%02d completed%n",
                    companyID);
            return sum;
        }
    }

    private static void generateMatrix() {
        salesMatrix = new int[NUMBER_OF_CUSTOMERS][NUMBER_OF_MONTHS];
        for (int i = 0; i < NUMBER_OF_CUSTOMERS; i++) {
            for (int j = 0; j < NUMBER_OF_MONTHS; j++) {
                salesMatrix[i][j] = (int) (Math.random() * 100);
            }
        }
    }

    private static void printMatrix() {
        System.out.print("\t\t");
        String[] monthDisplayNames = (new DateFormatSymbols()).getShortMonths();
        for (String strName : monthDisplayNames) {
            System.out.printf("%12s", strName);
        }
        System.out.println();
        for (int i = 0; i < monthDisplayNames.length - 1; i++) {
            System.out.print("=======");
        }
        System.out.println("====");
        for (int i = 0; i < NUMBER_OF_CUSTOMERS; i++) {
            System.out.printf("Client ID:1%02d%2s", i, "|");
            for (int j = 0; j < NUMBER_OF_MONTHS; j++) {
                System.out.printf("%6d", salesMatrix[i][j]);
            }
            System.out.println();
        }
        System.out.println();
    }

    public static void main(String[] args) throws 
            InterruptedException, ExecutionException {
        generateMatrix();
        printMatrix();
        ExecutorService executor = Executors.newFixedThreadPool(10);
        Set<Future<Integer>> set = new HashSet<Future<Integer>>();
        for (int row = 0; row < NUMBER_OF_CUSTOMERS; row++) {
            Callable<Integer> callable = new Summer(row);
            Future<Integer> future = executor.submit(callable);
            set.add(future);
        }
        int sum = 0;
        for (Future<Integer> future : set) {
            sum += future.get();
        }
        System.out.printf("%nThe annual turnover (bags): %s%n%n", sum);
        executor.shutdown();
    }
}

部分結果:

                  一月          二月          三月          四月          五月          六月          七月          八月          九月          十月         十一月         十二月            

========================================================================================

Client ID:100 |    82    19    30    27    90    33    64    32    20    40    60    36

Client ID:101 |    19    99    14    26    87    86    26    22    51     2    75    57

Client ID:102 |    41    86    32    68    91    52     0    38    77    13    53     7

......

Client ID:197 |    68    93    72    72     8    68    10     6    90    11    81    78

Client ID:198 |    80    86    88    87    17    87    47    62    88    62    76    47

Client ID:199 |    10    38    74    36    75    17    31     4    48    92    43    59

Totaling for client 101 completed

Totaling for client 103 completed

Totaling for client 102 completed

......

Totaling for client 104 completed

Totaling for client 130 completed

Totaling for client 120 completed

The annual turnover (bags): 60275

FutureTask示例程序:演示股票交易程序,一個懶惰線程隨時能夠取消提交的任務,若是訂單已經完成取消失敗,若是任務正在執行且能夠中斷則取消該線程剩餘的處理流程,若是訂單已提交而且在分配給線程執行以前被取消,則訂單會取消成功。

package test1;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class StocksOrderProcessor {
    public static final int MAX_NUMBER_OF_ORDERS = 1000;
    private static final ExecutorService executor = Executors
            .newFixedThreadPool(100);
    private static List<Future<Integer>> ordersToProcess = new ArrayList<Future<Integer>>();

    private static class OrderExecutor implements Callable<Integer> {
        private int id = 0;
        private int count = 0;

        public OrderExecutor(int id) {
            this.id = id;
        }

        @Override
        public Integer call() throws Exception {
            while (count < 50) {
                count++;
                Thread.sleep(new Random(System.currentTimeMillis() % 100)
                        .nextInt(10));
            }
            System.out.println("Successfully executed order: " + id);
            return id;
        }
    }

    private static void submitOrder(int id) {
        Callable<Integer> callable = new OrderExecutor(id);
        ordersToProcess.add(executor.submit(callable));
    }

    public static void main(String[] args) {
        System.out.printf("Submitting %d trades%n", MAX_NUMBER_OF_ORDERS);
        for (int i = 0; i < MAX_NUMBER_OF_ORDERS; i++) {
            submitOrder(i);
        }
        new Thread(new EvilThread(ordersToProcess)).start();
        System.out.println("Cancelling a few orders at random");
        try {
            // 爲了讓全部任務均可以完成,讓執行器等待30秒鐘
            executor.awaitTermination(30, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Checking status before shutdown");
        int count = 0;
        for (Future<Integer> future : ordersToProcess) {
            if (future.isCancelled()) {
                count++;
            }
        }
        System.out.printf("%d trades cancelled%n", count);
        // shutdown方法並不意味着以前提交的任務被當即取消,而是發起任務順序中止以便以前提交的任務
        // 能夠被執行,可是他保證再也不接受新的任務。ExecuteExistingDelayedTaskAfterShutdownPolicy
        // 被設置爲false意味着現有未完成的任務會被取消,反之,不會被取消。
        // ContinueExistingPeriodicTasksAfterShutdownPolicy
        // 被設置爲true,那麼已有的週期性任務會被取消。
        executor.shutdown();
    }
}

class EvilThread implements Runnable {
    private List<Future<Integer>> ordersToProcess;

    public EvilThread(List<Future<Integer>> futures) {
        this.ordersToProcess = futures;
    }

    @Override
    public void run() {
        Random myNextKill = new Random(System.currentTimeMillis() % 100);
        for (int i = 0; i < 100; i++) {
            int index = myNextKill
                    .nextInt(StocksOrderProcessor.MAX_NUMBER_OF_ORDERS);
            boolean cancel = ordersToProcess.get(index).cancel(true);
            if (cancel == true) {
                System.out.println("Cancel Order Succeed: " + index);
            } else {
                System.out.println("Cancel Order Failed: " + index);
            }
            try {
                Thread.sleep(myNextKill.nextInt(100));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

部分執行結果:

Submitting 1000 trades

Successfully executed order: 64

Cancelling a few orders at random

Cancel Order Succeed: 857

Cancel Order Succeed: 402

Cancel Order Succeed: 262

Cancel Order Succeed: 72

Successfully executed order: 62

Successfully executed order: 102

Cancel Order Succeed: 327

......

Cancel Order Failed: 539

相關文章
相關標籤/搜索