任務&&線程的取消與關閉機制_中斷

任務&&線程的取消與關閉機制_中斷java

任務和線程的啓動很容易。但要使任務和線程能安全、快速、可靠地中止下來,並非一件容易的事。Java沒有提供任何機制來安全地終止線程。但它提供了中斷(Interruption),這是一種協做機制,可以使一個線程終止另外一個線程的當前工做。安全

這種協做方式是很必要的,咱們不多但願某個任務、線程或服務當即中止,由於這種當即中止會使共享的數據結構處於不一致的狀態。相反,在編寫任務和服務時可使用一種協做的方式:當須要中止時,他們首先會清除當前正在執行的工做,而後在結束。這提供了更好的靈活性。由於任務自己的代碼比發出取消請求的代碼更清楚如何執行清楚操做。數據結構

 

使用「已請求取消」標誌取消關閉任務和線程

看代碼演示併發

package sync;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;

/**
 */
public class PrimeGenerator implements Runnable {

    private final List<BigInteger> primes = new ArrayList<BigInteger>();


    private volatile boolean cancelled;//經過標誌位取消線程任務的執行

    @Override
    public void run() {
        BigInteger p = BigInteger.ONE;
        while (!cancelled) {
            p = p.nextProbablePrime();
            synchronized (this) {
                primes.add(p);
            }
        }
    }

    public void cancel() {
        cancelled = true;
    }

    public synchronized List<BigInteger> get() {
        return new ArrayList<BigInteger>(primes); //注意這裏要從新new一個對象,防止外部修改
    }


    public static void main(String args[]) throws InterruptedException {
        PrimeGenerator generator = new PrimeGenerator();
        new Thread(generator).start();
        try {
            Thread.sleep(1000);
        } finally {
            generator.cancel();
        }
        List<BigInteger> ps = generator.get();
        for (BigInteger b : ps) {
            System.out.println(b);
        }
        return;
    }
}

PrimeGenerator 採用了一種簡單的取消策略:客戶代碼經過調用cancel來請求取消,PrimeGenerator 在每次搜索素數前首先檢查是否存在取消請求,若是存在則退出。less

PrimeGenerator 中的取消機制最終會使搜索素數的任務退出,但在退出過程當中須要花費必定的時間。然而,若是使用了這種方法的任務調用了一個阻塞方法,例如BlockingQueue.put, 那麼可能會產生一個更嚴重的問題——任務可能永遠不會檢查取消標誌,所以永遠不會結束。異步

 

經過中斷取消關閉任務和線程

線程中斷是一種協做機制,線程能夠經過這種機制來通知另外一個線程,告訴他在合適的或者可能的狀況下中止當前工做,並轉而執行其餘的工做。經過中斷並不能直接終止另外一個線程,而須要被中斷的線程本身處理中斷。ide

這比如是家裏的父母叮囑在外的子女要注意身體,但子女是否注意身體,怎麼注意身體則徹底取決於本身。this

‍‍‍‍‍‍‍每個線程都有一個boolean類型的中斷狀態。當中斷線程時,這個線程的中斷狀態將被設置爲true。‍‍‍‍‍‍在Thread中包含了中斷線程以及查詢線程中斷狀態的方法。spa

Java中斷模型也是這麼簡單,每一個線程對象裏都有一個boolean類型的標識(不必定就要是Thread類的字段,實際上也的確不是,這幾個方法最終都是經過native方法來完成的),表明着是否有中斷請求(該請求能夠來自全部線程,包括被中斷的線程自己)。例如,當線程t1想中斷線程t2,只須要在線程t1中將線程t2對象的中斷標識置爲true,而後線程2能夠選擇在合適的時候處理該中斷請求,甚至能夠不理會該請求,就像這個線程沒有被中斷同樣。線程

Thread中的中斷方法:

public void interrupt():能中斷目標線程(interrupt方法僅僅只是將中斷狀態置爲true)

public boolean isInterrupted():返回目標線程的中斷狀態

public static boolean interrupted():將清除當前線程的中斷狀態,這也是清除中斷狀態的惟一方法

interrupt方法是惟一能將中斷狀態設置爲true的方法。靜態方法interrupted會將當前線程的中斷狀態清除。

/**
 * Tests whether the current thread has been interrupted.  The
 * <i>interrupted status</i> of the thread is cleared by this method.  In
 * other words, if this method were to be called twice in succession, the
 * second call would return false (unless the current thread were
 * interrupted again, after the first call had cleared its interrupted
 * status and before the second call had examined it).
 *
 * <p>A thread interruption ignored because a thread was not alive
 * at the time of the interrupt will be reflected by this method
 * returning false.
 *
 * @return  <code>true</code> if the current thread has been interrupted;
 *          <code>false</code> otherwise.
 * @see #isInterrupted()
 * @revised 6.0
 */
public static boolean interrupted() {
    return currentThread().isInterrupted(true);
}
/**
 * Tests whether this thread has been interrupted.  The <i>interrupted
 * status</i> of the thread is unaffected by this method.
 *
 * <p>A thread interruption ignored because a thread was not alive
 * at the time of the interrupt will be reflected by this method
 * returning false.
 *
 * @return  <code>true</code> if this thread has been interrupted;
 *          <code>false</code> otherwise.
 * @see     #interrupted()
 * @revised 6.0
 */
public boolean isInterrupted() {
    return isInterrupted(false);
}

阻塞庫方法,例如Thread.sleep()和Object.wait()等,都會檢查線程什麼時候中斷,而且在發現中斷時提早返回。它們在響應中斷時執行的操做包括:清除中斷狀態,拋出InterruptedException,表示阻塞操做因爲中斷而提早結束。‍JVM並不能保證阻塞方法檢測到中斷的速度,可是實際狀況下響應的速度仍是很是快的。

 

當線程在非阻塞狀態下中斷時,他的中斷狀態將被設置,而後根據將被取消的操做來檢查中斷狀態以判斷髮生了中斷。經過這樣的方法,中斷操做將變得「有黏性」——若是不觸發InterruptException,那麼中斷狀態將一直保持,直到明確地清除中斷狀態。

調用interrupt並不意味着當即中止目標線程正在進行的工做,而只是傳遞了請求中斷的消息。

一般,中斷是實現取消的最合理方式。‍使用中斷來取消任務和線程的示例

package sync;

import java.math.BigInteger;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 */
public class PrimeProducer extends Thread {

    private final BlockingQueue<BigInteger> queue;

    public PrimeProducer(BlockingQueue<BigInteger> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            BigInteger p = BigInteger.ONE;
            while (!Thread.currentThread().isInterrupted()) {
                queue.put(p = p.nextProbablePrime());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
        }
    }
    
    // 中斷線程,將中斷標誌位設爲 true
    public void cancel() {
        interrupt();
    }

    public synchronized BigInteger[] get() {
        return queue.toArray(new BigInteger[queue.size()]);
    }

    public static void main(String args[]) throws InterruptedException {
        BlockingQueue q = new ArrayBlockingQueue(500);
        PrimeProducer primeProducer = new PrimeProducer(q);
        primeProducer.start();
        try {
            Thread.sleep(2000);    //使當前線程等待2秒
        } finally {
            primeProducer.cancel();    //中斷線程
        }

        if (primeProducer.isInterrupted()) {
            BigInteger[] bs = primeProducer.get();
            for (BigInteger b : bs) {
                System.out.println(b);
            }
        }
    }
}

在每次迭代循環中,有兩個位置能夠檢測出中斷:在阻塞的put方法調用中,以及在循環開始處查詢中斷狀態時。因爲調用了阻塞的put方法(線程內部在執行時能夠檢查中斷狀態這個值,來獲知此線程是否應該結束了),所以這裏並不必定須要進行顯式的檢測,但執行檢測會使PrimeProducer對中斷具備較高的響應性,由於他是在啓動尋找素數任務以前檢查中斷的,而不是在任務完成以後。若是可中斷的阻塞方法的調用頻率並不高,不足以得到足夠的響應性,那麼顯式的檢測中斷狀態能起到必定的幫助做用。

 

可以設置中斷狀態的方法

除了 Thread.interrupt() 方法之外,下列 JDK 中的方法也會設置中斷(也是經過調用 Thread.interrupt() 來實現的):

  • FutureTask.cancel()

  • ExecutorService.shutdownNow() 這個方法會調用線程池中全部線程的中斷方法,不論它們是空閒的仍是運行中的

  • ExecutorService.shutdown() 方法只能中斷空閒的線程

上面只是舉兩個 JDK 中應用到了線程中斷的例子,這樣的例子還有不少,就不一一列舉了。固然,爲了能響應中斷,在你所寫的 Runnable 或 Callable 代碼中,必須經過 Thread.isInterrupted()、Thread.interrupted() 方法,或者捕獲 InterruptedException 等的中斷異常來發現線程中斷並處理,不然線程是不會自行提早結束的。

 

中斷的響應和處理

InterruptedException 是最多見的中斷表現形式。因此如何處理 InterruptedException 便成爲 Java 中斷知識中的必修課。處理 InterruptedException 可有如下幾種方式:

直接向上拋出

將異常不作任何處理,直接拋向該方法的調用者

public class TaskQueue {
    private static final int MAX_TASKS = 1000;
 
    private BlockingQueue<Task> queue 
        = new LinkedBlockingQueue<Task>(MAX_TASKS);
 
    public void putTask(Task r) throws InterruptedException { 
        queue.put(r);
    }
 
    public Task getTask() throws InterruptedException { 
        return queue.take();
    }
}

 

在 catch 中作處理後在拋出

由於 InterruptedException 的拋出,會打斷方法執行,使正在進行的工做只完成一部分。在有些狀況下,你就須要進行諸如回滾的處理。因此在這種狀況便須要在 catch 塊中進行處理以後在向上拋出 InterruptedException。

public class PlayerMatcher {
    private PlayerSource players;
 
    public PlayerMatcher(PlayerSource players) { 
        this.players = players; 
    }
 
    public void matchPlayers() throws InterruptedException { 
        try {
             Player playerOne, playerTwo;
             while (true) {
                 playerOne = playerTwo = null;
                 // Wait for two players to arrive and start a new game
                 playerOne = players.waitForPlayer(); // could throw IE
                 playerTwo = players.waitForPlayer(); // could throw IE
                 startNewGame(playerOne, playerTwo);
             }
         }
         catch (InterruptedException e) {  
             // If we got one player and were interrupted, put that player back
             if (playerOne != null)
                 players.addFirst(playerOne);
             // Then propagate the exception
             throw e;
         }
    }
}

 

不拋出InterruptedException時要恢復中斷

不少時候,因爲你所實現的接口定義的限制,你極可能沒法拋出 InterruptedException。‍例如實現 Runnable 接口以編寫業務代碼。這時,你就沒法再向上拋出 InterruptedException 了。‍此時你應該使用 Thread.currentThread().interrupt() 方法去恢復中斷狀態。由於阻塞方法在拋出 InterruptedException 時會清除當前線程的中斷狀態,若是此時不恢復中斷狀態,也不拋出 InterruptedException,那中斷信息便會丟失,上層調用者也就沒法得知中斷的發生。這樣便有可能致使任務沒法正確終止的狀況方式。

public class TaskRunner implements Runnable {
    private BlockingQueue<Task> queue;
 
    public TaskRunner(BlockingQueue<Task> queue) { 
        this.queue = queue; 
    }
 
    public void run() { 
        try {
             while (true) {
                 Task task = queue.take(10, TimeUnit.SECONDS);
                 task.execute();
             }
         }
         catch (InterruptedException e) { 
             // Restore the interrupted status
             // 當拋出中斷異常後,若是此時不能向上拋出,則須要恢復中斷狀態,不然中斷狀態會丟失
             Thread.currentThread().interrupt();
         }
    }
}

在這裏,咱們要清楚中斷的意義在於併發或異步場景下任務的終止。因此,若是你的代碼在吞掉 InterruptedException 而不拋出時並不會形成任務沒法被正確終止的狀況方式,那也能夠再也不恢復中斷。

仍是 Runnable 的例子,你們都知道 Runnable 不少時候是用在線程池中,由線程池提供線程來運行。而且一般是做爲任務的頂層容器來使用的,也就是說在線程池和 Runnable 實現之間,沒有別的調用層了。那麼在 try-catch InterruptedException 以後,即可不用在恢復線程中斷了(固然,若是有回滾等的需求,仍是須要實現的)。由於經過異常的捕獲,你已經能夠正確終止線程了。

可是,若是不是上述狀況,你所寫的,捕獲 InterruptedException 的方法會被其它的、非線程池類的方法調用。‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍例若有 A, B 兩個方法,A 被 B 方法調用,A 中捕獲 InterruptedException 後沒有恢復線程中斷,而 B 方法是一個無限循環,經過檢查線程中斷來退出,或者在調用 A 以後有個阻塞的方法。那便會形成線程沒法按照指望被終止的狀況發生。‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍

 

本身拋出InterruptedException

有時候,你須要「無中生有」地創造出一個 InterruptedException 以表示中斷的發生。固然,在這個時候,你也須要使用 Thread.isInterrupted() 或 Thread.interrupted() 來檢測中斷的發生。其實看了前面的部分咱們知道,拋出 InterruptedException 時,線程中斷狀態須要被清除()。這就是 Thread.interrupted() 的做用。‍其實,你要是看 JDK 源代碼,就會發現,JDK 中併發類也是這麼作的。

NOTE: 使用 Thread.interrupt() 和 InterruptedException 中的哪一種方法表示中斷?

上面提到了一種狀況是因爲接口的限制而沒法拋出 InterruptedException,這時你別無選擇,只能用 Thread.interrupt() 恢復中斷。除了這種狀況,其它的時候推薦使用 InterruptedException 來表示中斷。當方法聲明拋出 InterruptedException 時,它就是在告訴調用者,我這個方法可能會花費不少的時間,而你能夠經過線程中斷來終止調用。經過 InterruptedException 來表示中斷,含義更清晰,反應也更迅速。

===========END===========

相關文章
相關標籤/搜索