Callable接口、Runable接口、Future接口

1. Callable與Runable區別java

Java從發佈的第一個版本開始就能夠很方便地編寫多線程的應用程序,並在設計中引入異步處理。Thread類、Runnable接口和Java內存管理模型使得多線程編程簡單直接。程序員

但Thread類和Runnable接口都不容許聲明檢查型異常,也不能定義返回值。沒有返回值這點稍微有點麻煩。不能聲明拋出檢查型異常則更麻煩一些。編程

public void run()方法契約意味着你必須捕獲並處理檢查型異常。即便你當心地保存了異常信息(在捕獲異常時)以便稍後檢查,但也不能保證這個類(Runnable對象)的全部使用者都讀取異常信息。安全

你也能夠修改Runnable實現的getter,讓它們都能拋出任務執行中的異常。但這種方法除了繁瑣也不是十分安全可靠,你不能強迫使用者調用這些方法,程序員極可能會調用join()方法等待線程結束而後就無論了。多線程

可是如今不用擔憂了,以上的問題終於在1.5中解決了。Callable接口和Future接口的引入以及他們對線程池的支持優雅地解決了這兩個問題。併發

無論用哪一種方式建立線程,其本質都是Callable接口與Runable接口。二者都是可被其它線程執行的任務!!區別是:less

(1)Callable規定的方法是call(),而Runnable規定的方法是run()。

(2)Callable的任務執行後可返回值,而Runnable的任務是不能返回值的。

(3)call()方法可拋出異常,而run()方法是不能拋出異常的。

(4)運行Callable任務可拿到一個Future對象。

2.Futuredom

如上所說,Callable任務返回Future對象。即:Callable和Future一個產生結果,一個拿到結果。異步

Future 表示異步計算的結果。Future接口中有以下方法:ide

  •     boolean cancel(boolean mayInterruptIfRunning)

取消任務的執行。參數指定是否當即中斷任務執行,或者等等任務結束

  •     boolean isCancelled() 

任務是否已經取消,任務正常完成前將其取消,則返回 true

  •     boolean isDone()

任務是否已經完成。須要注意的是若是任務正常終止、異常或取消,都將返回true

  •     V get()

等待任務執行結束,而後得到V類型的結果。InterruptedException 線程被中斷異常, ExecutionException任務執行異常,若是任務被取消,還會拋出CancellationException

  •     V get(long timeout, TimeUnit unit) 

同上面的get功能同樣,多了設置超時時間。參數timeout指定超時時間,uint指定時間的單位,在枚舉類TimeUnit中有相關的定義。若是計算超時,將拋出TimeoutException

Future接口提供方法來檢測任務是否被執行完,等待任務執行完得到結果。也能夠設置任務執行的超時時間,這個設置超時的方法就是實現Java程序執行超時的關鍵。

因此,若是須要設定代碼執行的最長時間,即超時,能夠用Java線程池ExecutorService類配合Future接口來實現。

三個簡單的小例子,體會一下:

package com.zyf.Future;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class FutureGetTimeOut1 {  
    public static void main(String[] args){  
        int timeout = 2;   
        ExecutorService executor = Executors.newSingleThreadExecutor();  
        Boolean result = false;     
        Future<Boolean> future = executor.submit(new TaskThread("發送請求"));//將任務提交給線程池
        try {     
            result = future.get(timeout, TimeUnit.SECONDS);
           // result = future.get(timeout, TimeUnit.MILLISECONDS); //1
            System.out.println("發送請求任務的返回結果: "+result);  //2
        } catch (InterruptedException e) {  
            System.out.println("線程中斷出錯。");  
            future.cancel(true);// 中斷執行此任務的線程     
        } catch (ExecutionException e) {     
            System.out.println("線程服務出錯。");  
            future.cancel(true);
        } catch (TimeoutException e) {// 超時異常     
            System.out.println("超時。");     
            future.cancel(true);  
        }finally{  
            System.out.println("線程服務關閉。");  
            executor.shutdown();  
        }  
    }  
      
    static class TaskThread implements Callable<Boolean> {    
        private String t;  
        public TaskThread(String temp){  
            this.t= temp;  
        }  
        public Boolean call() {  
            //for用於模擬超時
            for(int i=0;i<999999999;i++){  
                if(i==999999998){  
                    System.out.println(t+"成功!");  
                }  
                if (Thread.interrupted()){ //很重要  
                    return false;     
                }  
            }   
            System.out.println("繼續執行..........");     
            return true;     
        }     
    }   
}
package com.zyf.Future;

import java.util.concurrent.*;

public class FutureGetTimeOut2 {

    public static void main(String[] args) {
        final ExecutorService service = Executors.newFixedThreadPool(1);
        TaskThread taskThread = new TaskThread();
        System.out.println("提交任務...begin");
        Future<Object> taskFuture = service.submit(taskThread);
        System.out.println("提交任務...end");
        try {
            Object re = taskFuture.get(60000, TimeUnit.MILLISECONDS);// 超時設置
            System.out.println(re);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            System.out.println("超時 取消任務");
            taskFuture.cancel(true);
            System.out.println("超時 取消任務OK");
        } finally {
            service.shutdown();
        }
    }
}

class TaskThread implements Callable<Object> {
    public Object call() throws Exception {
        String result = "空結果";
        try {
            System.out.println("任務開始....");
            //修改sleep 的值測試超時
            Thread.sleep(500);
            result = "正確結果";
            System.out.println("任務結束....");
        } catch (Exception e) {
            System.out.println("Task is interrupted!");
        }
        return result;
    }
}
package com.zyf.Future;

import java.util.concurrent.*;

class MyCallable implements Callable<Object> {

    private int flag = 0;

    public MyCallable(int flag) {

        this.flag = flag;

    }

    public String call() throws Exception {

        if (this.flag == 0) {

            return "flag = 0";

        }

        if (this.flag == 1) {

            try {

                while (true) {

                    System.out.println("looping.");

                    Thread.sleep(2000);

                }

            } catch (InterruptedException e) {

                System.out.println("Interrupted");

            }

            return "false";

        } else {

            throw new Exception("Bad flag value!");

        }

    }

}

public class FutureGetBlock {
    
    public static void main(String[] args) {

        // 定義3個Callable類型的任務

        MyCallable task1 = new MyCallable(0);

        MyCallable task2 = new MyCallable(1);

        MyCallable task3 = new MyCallable(2);

        // 建立一個執行任務的服務

        ExecutorService es = Executors.newFixedThreadPool(3);

        try {

            // 提交併執行任務,任務啓動時返回了一個Future對象,

            // 若是想獲得任務執行的結果或者是異常可對這個Future對象進行操做

            Future<?> future1 = es.submit(task1);

            // 得到第一個任務的結果,若是調用get方法,當前線程會等待任務執行完畢後才往下執行

            System.out.println("task1: " + future1.get());

            Future<?> future2 = es.submit(task2);

            // 等待5秒後,再中止第二個任務。由於第二個任務進行的是無限循環

            Thread.sleep(5000);

            System.out.println("task2 cancel: " + future2.cancel(true));

            // 獲取第三個任務的輸出,由於執行第三個任務會引發異常

            // 因此下面的語句將引發異常的拋出

            Future<?> future3 = es.submit(task3);

            System.out.println("task3: " + future3.get());

        } catch (Exception e) {

            System.out.println(e.toString());

        }

        // 中止任務執行服務

        es.shutdownNow();

    }

}

 

3. Future實現類

3.1 FutureTask

FutureTask是一個RunnableFuture<V>,而RunnableFuture實現了Runnbale又實現了Futrue<V>這兩個接口,

public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

另外它還能夠包裝Runnable和Callable<V>

public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

 public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

能夠看到,Runnable會被Executors.callable()函數轉換爲Callable類型,即FutureTask最終都是執行Callable類型的任務。該適配函數的實現以下 :

public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
/**
 * A callable that runs given task and returns given result
 */
static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

因爲FutureTask實現了Runnable,所以它既能夠經過Thread包裝來直接執行,也能夠提交給ExecuteService來執行。見下面兩個例子:

package com.zyf.Future;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureTaskDemo {
    public static void main(String[] args) {
        Callable<Integer> callable = new Callable<Integer>() {
            public Integer call() throws Exception {
                return new Random().nextInt(100);
            }
        };

        FutureTask<Integer> future = new FutureTask<Integer>(callable);
        new Thread(future).start();

        try {
            Thread.sleep(1000);// 可能作一些事情

            int result = future.get();
            System.out.println(result);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}
package com.zyf.Future;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class FutureTaskDemo2 {

    static ExecutorService mExecutor = Executors.newSingleThreadExecutor();

    public static void main(String[] args) {
        futureDemo();
    }

    static void futureDemo() {
        try {
            /**
             * 提交runnable則沒有返回值, future沒有數據
             */
            Future<?> future = mExecutor.submit(new Runnable() {

                @Override
                public void run() {
                    fibc(20);
                }
            });

            System.out.println("future result from runnable : " + future.get());

            /**
             * 提交Callable, 有返回值, future中可以獲取返回值
             */
            Future<Integer> result2 = mExecutor.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    return fibc(20);
                }
            });

            System.out.println("future result from callable : " + result2.get());

            /**
             * FutureTask則是一個RunnableFuture<V>,即實現了Runnbale又實現了Futrue<V>這兩個接口,
             * 另外它還能夠包裝Runnable(實際上會轉換爲Callable)和Callable
             * <V>,因此通常來說是一個符合體了,它能夠經過Thread包裝來直接執行,也能夠提交給ExecuteService來執行
             * ,而且還能夠經過v get()返回執行結果,在線程體沒有執行完成的時候,主線程一直阻塞等待,執行完則直接返回結果。
             */
            FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    return fibc(20);
                }
            });
            // 提交futureTask
            mExecutor.submit(futureTask);
            System.out.println("future result from futureTask : " + futureTask.get());

            mExecutor.shutdown();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    /**
     * 效率底下的斐波那契數列, 耗時的操做
     * 
     * @param num
     * @return
     */
    static int fibc(int num) {
        if (num == 0) {
            return 0;
        }
        if (num == 1) {
            return 1;
        }
        return fibc(num - 1) + fibc(num - 2);
    }

}

若是要執行多個帶返回值的任務,並取得多個返回值,兩種方法:

1.先建立一個裝Future類型的集合,用Executor提交的任務返回值添加到集合中,最後便利集合取出數據。

這時候,submit的task不必定是按照加入本身維護的list順序完成的。從list中遍歷的每一個Future對象並不必定處於完成狀態,這時調用get()方法就會被阻塞住。

若是系統是設計成每一個線程完成後就能根據其結果繼續作後面的事,這樣對於處於list後面的可是先完成的線程就會增長了額外的等待時間。

因此jdk1.8增長了Future接口的另一個實現類CompletionService

2.CompletionService至關於Executor加上BlockingQueue,使用場景爲當子線程併發了一系列的任務之後,主線程須要實時地取回子線程任務的返回值並同時順序地處理這些返回值,誰先返回就先處理誰。

而CompletionService的實現是維護一個保存Future對象的BlockingQueue。只有當這個Future對象狀態是結束的時候,纔會加入到這個Queue中,take()方法其實就是Producer-Consumer中的Consumer。它會從Queue中取出Future對象,若是Queue是空的,就會阻塞在那裏,直到有完成的Future對象加入到Queue中。

因此,先完成的一定先被取出。這樣就減小了沒必要要的等待時間。

相關文章
相關標籤/搜索