JUC系列四:任務的取消與關閉

在大多數狀況下,咱們建立一個任務,都會讓它運行直到結束。但有時,又須要在某種狀況下取消任務,好比用戶請求取消,有時間限制的任務,任務運行時出現錯誤等等。在Java中,沒有一種安全的搶佔式方式來中止線程(什麼意思?),所以也沒有安全的搶佔式方法來中止任務。html

###標識 在前面的例子中,咱們曾使用volatile來修飾一個變量做爲方法退出的一種標識,而在任務中,咱們一樣可使用它來使得任務在須要的狀況下退出。在下面的例子中,PrimeGenerator每次在生成素數以前都會檢查canceled標識,若是爲true,則當即退出任務。java

public class PrimeGenerator implements Runnable{
	private final List<BigInteger> primes=new ArrayList<>();
	private volatile boolean cancelled;
	public void run(){
		BigInteger p=BigInteger.ONE;
		while(!cancelled){
			p=p.nextProbablePrime();
			synchronized(primes){
				primes.add(p);
			}
		}
	}
	public void cancel(){
		cancelled=true;
	}
	public synchronized List<BigInteger> get(){
		return new ArrayList<BigInteger>(primes);
	}
	public static void main(String []args) throws Exception{
		PrimeGenerator generator=new PrimeGenerator();
		new Thread(generator).start();
		try{
			Thread.sleep(100);
		}finally{
			generator.cancel();
		}
		List<BigInteger> results=generator.get();
		for(BigInteger i:results){
			System.out.println(i);
		}
	}
}

但這種方式只能知足一部分需求,若是在任務執行代碼中存在線程阻塞的方法(sleep(),wait()...),那麼就可能存在一個很嚴重的問題,任務不能如指望的那樣及時退出,甚至可能永遠不會退出。安全

public class BrokenPrimeProducer extends Thread{
	private final BlockingQueue<BigInteger> queue;
	private volatile boolean cancelled=false;
	public BrokenPrimeProducer(BlockingQueue<BigInteger> queue){
		this.queue=queue;
	}
	public void run(){
		try{
			BigInteger p=BigInteger.ONE;
			while(!cancelled){
				queue.put(p=p.nextProbablePrime());
			}
		}catch(InterruptedException e){}
	}
	public void cancel(){
		cancelled=true;
	}
	public static void main(String []args) throws Exception{
		BlockingQueue<BigInteger> primes=new LinkedBlockingQueue<BigInteger>(10);
		BrokenPrimeProducer producer=new BrokenPrimeProducer(primes);
		producer.start();
		int count=0;
		try{
			while(count<10){
				count++;
				Thread.sleep(1000);
				System.out.println(primes.take());
			}
		}finally{
			producer.cancel();
		}
		System.out.println("over..");
	}
}

在上面的例子中,BrokenPrimeProducer用於生產素數,並將結果保存在阻塞隊列中,而main方法則在不斷的從隊列中讀取素數。可是程序在運行到producer.cancel()以後,生產者線程並無如期的中止下來。這是由於,當隊列已滿時,queue.put()將會阻塞,而此時count>=10,再也不執行primes.take(),那麼在調用producer.cancel()時,因爲producer一直阻塞在queue.put方法處,使得線程不能檢查到cancelled標識,致使線程永遠不會結束。服務器

###線程中斷app

對於須要取消 存在阻塞操做的任務,則不能使用檢查標識的方式,而是經過線程中斷機制。每一個線程都有一個boolean類型的中斷狀態,當中斷線程時,該狀態會被設置爲true。在Thread類中,定義interrupt方法來中斷線程目標,而isInterrupted方法能返回中斷狀態。靜態的interrupted方法將清除當前線程的中斷狀態並返回它以前的值。異步

須要注意的是:socket

  • 線程中斷是一種協做機制,線程能夠經過這種機制來通知另外一個線程,告訴它在合適的或者可能的狀況下中止當前的工做。this

  • 線程中斷並不表明線程終止,線程的中斷只是改變了線程的中斷狀態,這個中斷狀態改變後帶來的結果取決於這個程序自己.net

  • 調用interrupt方法並非當即中止線程,而是發出了一箇中斷請求,而後有線程自己在某個合適的時間點中斷本身。對於wait(),sleep()等阻塞方法來講,將嚴格處理這種請求,當他們收到中斷請求或者開始執行發現中斷狀態被設置了,將拋出一個異常並將中斷狀態復位。線程

  • 一般,中斷是實現取消的最合理的方式

經過中斷實現取消操做

public class PrimeProducer extends Thread{
	private final BlockingQueue<BigInteger> queue;
	public PrimeProducer(BlockingQueue<BigInteger> queue){
		this.queue=queue;
	}
	public void run(){
		try{
			BigInteger p=BigInteger.ONE;
			while(!Thread.currentThread().isInterrupted()){
				queue.put(p=p.nextProbablePrime());
			}
		}catch(InterruptedException e){
			
		}
	}
	public void cancel(){
		interrupt();
	}
}

###響應中斷 當調用可阻塞的方法時,例如Thread.sleep()或BlockingQueue.put等,有兩種實用策略可用於處理InterruptedException:

  • 傳遞異常(拋出異常),從而使你的方法也成爲可中斷的阻塞方法

  • 恢復中斷,從而使調用棧中的上層代碼可以對其進行處理

####傳遞異常 傳遞InterruptedException的方法包括根本就不捕獲該異常,直接向上拋出,與將InterruptedException添加到throws子句中同樣(如程序清單getNextTask所示)。或者捕獲異常,在執行完某些操做後(清理),再拋出該異常。

BlockingQueue<Task> queue;
...
public Task getNextTask() throws InterruptedException{
	return queue.take();
}

####恢復中斷狀態

有時候不能拋出InterruptedException,例如在Runnable的run方法中,則必須捕獲該異常,並經過調用當前線程的interrupt方法恢復中斷狀態。這樣,在調用棧中更高層的代碼將看到引起了一箇中斷。

public void run() {   
    while (!Thread.currentThread().isInterrupted()) {   
        try {   
            ...   
            sleep(delay);   
        } catch (InterruptedException e) {
	    //拋出InterruptedException異常後,中斷標示位會自動清除
            Thread.currentThread().interrupt();//從新設置中斷標示   
        }   
    }   
}

另一種狀況是

public void mySubTask() {   
    ...   
    try {   
        sleep(delay);   
    } catch (InterruptedException e) {   
	//拋出InterruptedException異常後,中斷標示位會自動清除
        Thread.currentThread().interrupted();   
    }   
}
public void test(){
	while(!Thread.currentThread().isInterrupted()){
		mySubTask();
	}
}

拋出InterruptedException異常後,中斷標示位會自動清除,須要恢復中斷狀態,這樣,在test方法中才能看到mySubTask引起了一箇中斷。不然,test將繼續執行while。

###Future

在concurrent包中,ExecutorService.submit將返回一個Future來描述任務。Future擁有一個cancel方法,該方法帶有一個boolean類型的參數mayInterruptIfRunning。若是爲true而且這個線程正在運行,則線程能被中斷。若是爲false而且任務沒有運行,則該任務不會被啓動。

public static void timeOut(Runnable r,long timeout,TimeUnit unit) throws InterruptedException{
	Future<?> task=executorService.submit(r);
	try{
		task.get(timeout,unit);
	}catch(TimeoutException e){
		
	}catch(ExecutionException e){
		
	}finally{
		//若是任務已經結束,那麼執行取消操做也不會有任何影響
		task.cancel(true);//若是任務正在運行,則將被中斷
	}
}

###不可中斷的阻塞

在前面的例子中,只是針對某些可阻塞的方法作中斷請求,在Java庫中,並不是全部的可阻塞方法或者阻塞機制都能響應中斷;若是一個線程因爲執行同步的Socket I/O或者等待得到內置鎖而阻塞,那麼中斷請求只能設置線程的中斷狀態,除此以外沒有任何其它用處。那些因爲執行不可中斷操做操做而被阻塞的線程,可使用相似於中斷的手段來中止這些線程,但這要求咱們必須知道線程阻塞的緣由;

Socket I/O:在服務器應用程序中,常見的阻塞IO形式就是對套接字進行讀取和寫入。雖然InputStream和OutputStream中的read和write等方法都不會響應中斷,但經過關閉底層套接字,可使得因爲執行read或write等方法而被阻塞的線程拋出一個SocketException

同步 I/O:當中斷一個正在InterruptibleChannel上等待的線程時,將拋出ClosedByInterruptException並關閉鏈路。當關閉一個InterruptibleChannel時,將致使全部在鏈路操做上阻塞的線程都拋出AsynchronousCloseException。大多數標準的Channel都實現了InterruptibleChannel

Selector異步I/O:若是一個線程在調用Selector.select方法時阻塞,那麼調用close或者wakeup方法會使線程拋出CloseSelectorException並提早返回

獲取某個鎖:若是一個線程因爲等待某個內置鎖而阻塞,那麼將沒法響應中斷。但可使用Lock類作替代

下面給出一箇中斷套接字阻塞的例子

import java.io.IOException;
import java.net.ServerSocket;

public class ThreadTest extends Thread {
    volatile ServerSocket socket;

    public static void main(String args[]) throws Exception {
        ThreadTest1 thread = new ThreadTest1();
        System.out.println("Starting thread...");
        thread.start();
        Thread.sleep(3000);
        System.out.println("Asking thread to stop...");
        thread.socket.close();// 再調用close方法,此句去掉將則不會
        System.out.println("Stopping application...");
    }

    public void run() {
        try {
            socket = new ServerSocket(3036);
        } catch (IOException e) {
            System.out.println("Could not create the socket...");
            return;
        }
        while (!Thread.currentThread().isInterrupted()) {
            System.out.println("Waiting for connection...");
            try {
                socket.accept();
            } catch (IOException e) {
                System.out.println("accept() failed or interrupted...");
                Thread.currentThread().interrupt();// 從新設置中斷標示位
            }
        }
        //判斷線程是否被阻塞,若是被阻塞則沒法打印此句
        System.out.println("Thread exiting under request...");
    }
}

###日誌服務 在代碼清單LogWriter中給出了一個簡單的日誌服務示例,其中日誌操做在單獨的日誌線程中執行。產生日誌消息的線程並不會將消息直接輸出,而是將其保存在一個阻塞隊列中。這是一種多生產者單消費者的設計方式。

public class LogWriter{
	private final BlockingQueue<String> queue;
	private final LoggerThread logger;
	public LogWriter(Writer writer){
		this.queue=new LinkedBlockingQueue<String>(10);
		this.logger=new LoggerThread(writer);
	}
	public void start(){
		logger.start();
	}
	public void put(String msg) throws InterruptedException{
		queue.put(msg);
	}
	private class LoggerThread extends Thread{
		private final Writer writer;
		public void run(){
			try{
				while(true){
					writer.println(queue.take());
				}
			}catch(InterruptedException e){
				
			}finally{
				writer.close();
			}
		}
	}
}

當咱們想要中止日誌服務時,只須要在queue.take()方法捕獲拋出的InterruptedException異常,退出日誌服務便可。但這種退出方式是不完備的,首先,對於正在等待被寫入到日誌的信息將會丟失,其次,因爲隊列已滿時put操做會阻塞,因此等待put的線程也會被阻塞。這種狀態下,生產者和消費者須要同時被取消。因爲生產者不是專門的線程,所以要取消他們將很是困難。

另外一種關閉LogWriter的方式是設置一個「已請求關閉」的標識,以免進一步提交日誌。

public class LogService{
	private final BlockingQueue<String> queue;
	private fina LoggerThread loggerThread;
	private final PrintWriter writer;
	private boolean isShutDown;
	private int reservations;

	public void start(){
		loggerThread.start();
	}
	public void stop(){
		synchronized(this){
			isShutDown=true;
		}
		loggerThread.interrupt();
	}
	public void log(String msg) throws InterruptedException{
		synchronized(this){
			if(isShutDown){
				throw new InterruptedException("");
			}
			++reservations;
		}
		//由於put操做原本就是同步的,因此不須要再加內置鎖
		queue.put(msg);
	}
	private class LoggerThread extends Thread{
		public void run(){
			try{
				while(true){
					try{
						//若是處理完了阻塞隊列中的日誌則退出
						synchronized(LogService.this){
							if(isShutdown&&reservations==0){
								break;
							}
						}
						String msg=queue.take();
						synchronized(LogService.this){
							--reservations;
						}
						writer.println(msg);
					}catch(InterruptedException e){
						
					}
				}
			}finally{
				writer.close();
			}
		}
	}
}

更簡單的使用ExecutorService來管理

public class LogService{
	private final ExecutorService executorService=...
	...
	public void start(){
		
	}
	public void stop() throws InterruptedException{
		try{
			executorService.shutdown();//再也不接收新的任務
			executorService.awaitTermination(TIMEOUT,UNIT);//等待關閉時間
		}finally{
			writer.close();
		}
	}
	public void log(String msg){
		try{
			exectorService.submit(new Task(msg));
		}catch(RejectedExecutionException e){
			
		}
	}
}

參考

相關文章
相關標籤/搜索