java併發編程(六): 取消與關閉

取消與關閉:

  • 如何正確安全地取消或關閉任務。

任務取消:

  • 若外部代碼能在某個操做正常完成以前將其置入「完成」狀態,則還操做是可取消的
  • 取消操做的緣由:

       1. 用戶請求取消。 java

       2. 有時間限制的操做,如超時設定web

       3. 應用程序事件。 安全

       4. 錯誤。 異步

       5. 關閉。 jvm

以下面這種取消操做實現: socket

/**
 * 一個可取消的素數生成器
 * 使用volatile類型的域保存取消狀態
 * 經過循環來檢測任務是否取消
 */
@ThreadSafe
public class PrimeGenerator implements Runnable {
	private final List<BigInteger> primes = new ArrayList<>();
	private volatile boolean canceled;
	
	@Override
	public void run() {
		BigInteger p = BigInteger.ONE;
		while (!canceled){
			p = p.nextProbablePrime();
			synchronized (this) { //同步添加素數
				primes.add(p);
			}
		}
	}
	
	/**
	 * 取消生成素數
	 */
	public void cancel(){
		canceled = true;
	}
	
	/**
	 * 同步獲取素數
	 * @return 已經生成的素數
	 */
	public synchronized List<BigInteger> get(){
		return new ArrayList<>(primes);
	}
}

其測試用例: ide

public class PrimeGeneratorTest {
	public static void main(String[] args) {
		PrimeGenerator pg = new PrimeGenerator();
		new Thread(pg).start();
		
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally{
			pg.cancel(); //始終取消
		}
		
		System.out.println("all primes: " + pg.get());
	}
}

中斷:

  • 調用interrupt並不意味者當即中止目標線程正在進行的工做,而只是傳遞了請求中斷的消息。會在下一個取消點中斷本身,如wait, sleep,join等。
  • 一般,中斷是實現取消的合理方式

下面經過中斷實現取消功能: 測試

/**
 * 經過中斷來實現取消
 * 不採用boolean變量,
 * 防止在queue.put()時因爲阻塞,不能檢查到boolean變量而沒法取消
 * 但使用interrupt就能夠,
 * 即便queue.put()阻塞, 也會檢查到interrupt信號,從而拋出IntteruptedException
 * 從而達到取消的目的
 */
public class PrimeProducer extends Thread {
	private final BlockingQueue<BigInteger> queue;
	
	public PrimeProducer(BlockingQueue<BigInteger> queue){
		this.queue = queue;
	}

	@Override
	public void run() {
		try {
			BigInteger p = BigInteger.ONE;
			while (!Thread.currentThread().isInterrupted()){
				queue.put(p = p.nextProbablePrime());
			}
		} catch (InterruptedException e) {
			// thread exit
		}
	}
	/**
	 * 取消
	 */
	public void cancel(){
		interrupt(); //中斷當前線程
	}
}

中斷策略:

  • 因爲每一個線程擁有各自的中斷策略,所以除非你知道中斷對該線程的含義,不然就不該該中斷這個線程。

響應中斷:

  • 處理InterruptedException的實用策略:

      1. 傳遞異常。 this

      2. 恢復中斷狀態,從而事調用棧的上層代碼可以對其進行處理。 spa

  • 只有實現了線程中斷策略的代碼才能夠屏蔽中斷請求,在常規的任務和庫代碼中都不該該屏蔽中斷請求。

經過Future實現取消:

public void timedRun(Runnable r, long timeout, TimeUnit unit)
			throws InterruptedException {
	Future<?> task = taskExec.submit(r);
	try {
		task.get(timeout, unit);
	} catch (ExecutionException e) {
		//任務執行中拋出異常
	} catch (TimeoutException e) {
		//任務超時處理
	} finally{
		//任務執行完畢,沒有影響; 任務執行中會中斷任務
		if (task != null) task.cancel(true);
	}
}
  • 當Future.get拋出InterruptedExceptionTimeoutException時 ,若是你知道再也不須要結果,那麼就能夠調用Future.cancel來取消任務。

處理不可中斷的阻塞:

  • 形成線程阻塞的緣由:

       1. java.io包中的同步Socket I/O。如套接字中進行讀寫操做read, write方法。

       2. java.io包中的同步I/O如當中斷或關閉正在InterruptibleChannel上等待的線程時,會對應拋出ClosedByInterruptException或                         AsynchronousCloseException

       3. Selector的異步I/O。若是一個線程在調用Selector.select時阻塞了,那麼調用close, wakeup會使線程拋出ClosedSelectorException

       4. 獲取某個鎖。當一個線程等待某個鎖而阻塞時,不會響應中斷。但Lock類的lockInterruptibly容許在等待鎖時響應中斷。

/**
 * 經過改寫interrupt方法將非標準的取消操做封裝在Thread中
 */
public class ReaderThread extends Thread {
	private final Socket socket;
	private final InputStream in;
	private int bufferSize;
	
	public ReaderThread(Socket socket, InputStream in) {
		this(socket, in, 1024);
	}
	
	public ReaderThread(Socket socket, InputStream in, int bufferSize) {
		this.socket = socket;
		this.in = in;
		this.bufferSize = bufferSize;
	}

	@Override
	public void interrupt() {
		try {
			socket.close(); //中斷前關閉socket
		} catch (IOException e) {
			
		} finally{
			super.interrupt();
		}
	}

	@Override
	public void run() {
		try {
			byte[] buf = new byte[bufferSize];
			while (true) {
				int count = in.read(buf);
				if (count < 0) {
					break;
				} else if (count > 0) {
					processBuffer(buf, count);
				}
			}
		} catch (IOException e) {
			// 線程中斷處理
		}
	}
       ...
}

採用newTaskFor來封裝非標準的取消:

/**
 * 可取消的任務接口
 */
public interface CancellableTask<T> extends Callable<T> {
	void cancel();
	RunnableFuture<T> newTask();
}

/**
 * 使用了Socket的任務
 * 在取消時須要關閉Socket
 */
public abstract class SocketUsingTask<T> implements CancellableTask<T> {
	private Socket socket;

	public void setSocket(Socket socket) {
		this.socket = socket;
	}

	@Override
	public T call() throws Exception {
		//do working
	       ...
	}

	@Override
	public synchronized void cancel() {
		try {
			if (socket != null){
				socket.close();
			}
		} catch (IOException ignored) {
		}
	}

	@Override
	public RunnableFuture<T> newTask() {
		return new FutureTask<T>(this){
			@Override
			public boolean cancel(boolean mayInterruptIfRunning) {
				try {
					SocketUsingTask.this.cancel();
				} catch (Exception ignored) {
				}
				return super.cancel(mayInterruptIfRunning);
			}
		};
	}
}

/**
 * 經過newTaskFor將非標準的取消操做封裝在任務中
 */
public class CancellingExecutor extends ThreadPoolExecutor {

	public CancellingExecutor(int corePoolSize, int maximumPoolSize,
			long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
	}

	@Override
	protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
		if (callable instanceof CancellableTask){ //如果咱們定製的可取消任務
			return ((CancellableTask<T>)callable).newTask();
		}
		return super.newTaskFor(callable);
	}
}

中止基於線程的服務:

  • 正確的封裝原則:除非擁有某個線程,不然不能對該線程進行操控。如中斷線程修改線程優先級等。
  • 對於持有線程的服務,只要服務的存在時間大於建立線程的方法的存在時間,那麼就應該提供生命週期的方法。如ExecutorService提供的shutdown(), shutdownNow()
/**
 * 不支持關閉的生產者-消費者日誌服務
 */
public class LogWriter {
	private final BlockingQueue<String> queue;
	private final LoggerThread logger;
	
	public LogWriter(Writer writer){
		this.queue = new LinkedBlockingDeque<String>();
		this.logger = new LoggerThread(writer);
	}
	
	public void start(){
		logger.start();
	}
	
	public void log(String msg) throws InterruptedException{
		queue.put(msg);
	}

	private class LoggerThread extends Thread{
		private final Writer writer;
		
		public LoggerThread(Writer writer) {
			this.writer = writer;
		}

		@Override
		public void run() {
			try {
				while(true){
					writer.write(queue.take());
				}
			} catch (IOException e) {
				// io exception handle
			} catch (InterruptedException e) {
				// interrupt exceptino handle
			} finally{
				try {
					writer.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}
}
  • 向LogWriter添加可靠的取消操做
/**
 * 爲LoggerWriter添加可靠的取消操做
 */
public class LogService {
	private final BlockingQueue<String> queue;
	private final LoggerThread logger;
	private final PrintWriter writer;
	private boolean isShutdown; //用於終止生產者
	private int reservations; //隊列中的消息數
	
	public LogService(PrintWriter writer){
		this.queue = new LinkedBlockingDeque<String>();
		this.logger = new LoggerThread();
		this.writer = writer;
	}
	
	/**
	 * 產生日誌
	 * @param msg 日誌內容
	 * @throws InterruptedException
	 */
	public void log(String msg) throws InterruptedException{
		synchronized (this) {
			if (isShutdown){
				throw new IllegalStateException("can't log, service has stopped.");
			}
			++reservations;
		}
		queue.put(msg);
	}
	
	/**
	 * 啓動日誌香妃
	 */
	public void start(){
		logger.start();
	}
	/**
	 * 中止日誌服務
	 */
	public void stop(){
		synchronized(this){
			isShutdown = true;
		}
		logger.interrupt(); //中斷日誌線程
	}
	
	/**
	 * 消費日誌線程
	 */
	private class LoggerThread extends Thread{
		@Override
		public void run() {
			try {
				while(true){
					try {
						synchronized (LogService.this) {
							if (isShutdown)
								break;
						}
						String msg = queue.take();
						synchronized (LogService.this) {
							--reservations;
						}
						writer.println(msg);
					} catch (InterruptedException e) {
						// retry
					}
				}
			}finally{
				writer.close();
			}
		}
	}
}

關閉ExecutorService:

  • ExecutorService提供兩種關閉服務的方法:

      1. shutdown: 安全關閉。再也不接受新任務提交,待全部隊列中的任務執行完成再關閉。

      2. shutdownNow: 強行關閉。再也不接受新任務提交,中止正在執行的任務,並返回未開始執行的任務列表。

/**
 * 封裝ExecutorService實現日誌服務
 */
public class LogService2 {
	private final ExecutorService exec = Executors.newSingleThreadExecutor();
	private final PrintWriter writer;
	
	public LogService2(PrintWriter writer){
		this.writer = writer;
	}
	
	/**
	 * 產生日誌
	 * @param msg 日誌內容
	 * @throws InterruptedException
	 */
	public void log(String msg) throws InterruptedException{
		exec.execute(new WriteTask(msg));
	}
	
	/**
	 * 中止日誌服務
	 * @throws InterruptedException 
	 */
	public void stop(long timeout, TimeUnit unit) throws InterruptedException{
		try {
			exec.shutdown(); //平緩關閉服務
			//關閉服務後, 阻塞到全部任務被執行完畢或者超時發生,或當前線程被中斷
			exec.awaitTermination(timeout, unit); 
		} finally{
			writer.close();
		}
	}
	...
}

"毒丸"對象:

  • 毒丸:指一個放在隊列上的對象,當獲得這個對象時,就當即中止

經過毒丸對象來關閉服務:

/**
 * 索引服務
 * 經過一個毒丸對象來關閉服務
 */
public class IndexingService {
	private static final File POISON = new File(""); //毒丸對象
	private final IndexerThread consumer = new IndexerThread(); //消費者
	private final CrawlerThread producer = new CrawlerThread(); //生產者
	private final BlockingQueue<File> queue = new LinkedBlockingDeque<File>();
	private final File root;
	
	public IndexingService(File root) {
		this.root = root;
	}

	/**
	 * 啓動索引服務
	 */
	public void start(){
		producer.start();
		consumer.start();
	}
	
	public void stop(){
		producer.interrupt(); //中斷爬蟲線程
	}
	
	public void awaitTermination() throws InterruptedException{
		consumer.join(); //等待消費者線程結束
	}
	
	/**
	 * 爬蟲線程
	 */
	private class CrawlerThread extends Thread{
		@Override
		public void run() {
			try {
				crawl(root);
			} catch (InterruptedException e) {
				// handle the exception
			} 
			try {
				while(true){
					queue.put(POISON);
					break;
				}
			} catch (InterruptedException e) {
				// retry
			}
		}

		private void crawl(File root) throws InterruptedException{
			// crawl from web
		}
	}
	
	/**
	 * 創建索引的線程
	 */
	private class IndexerThread extends Thread{
		@Override
		public void run() {
			try {
				while (true){
					File file = queue.take();
					if (file == POISON){ //如果毒丸對象
						break;
					} else{
						indexFile(file); //創建索引文件
					}
				}
			} catch (InterruptedException e) {
				// handle exception
			}
		}

		private void indexFile(File file) {
			
		}
	}
}

shutdownNow的侷限性:

  • 在關閉服務過程當中,咱們沒法經過常規方法來得知哪些任務已經開始但未結束。
/**
 * 在ExecutorService中跟蹤在關閉以後被取消的任務
 */
public class TrackingExecutor extends AbstractExecutorService {
	private final ExecutorService exec;
	private final Set<Runnable> tasksCancelledAtShutdown = 
			Collections.synchronizedSet(new HashSet<Runnable>());
	
	public TrackingExecutor(ExecutorService exec) {
		this.exec = exec;
	}

	/**
	 * 獲取關閉後取消的任務
	 */
	public List<Runnable> getCancelledTasks(){
		if (!exec.isTerminated()){
			throw new IllegalStateException("service doesn't stop");
		}
		return new ArrayList<>(tasksCancelledAtShutdown);
	}
	
	@Override
	public void execute(final Runnable command) {
		exec.execute(new Runnable() {
			@Override
			public void run() {
				try {
					command.run();
				} finally{ //有可能出現誤報: 任務執行完畢了, 線程池
					if (isShutdown() && //若Executor已經關閉了
							Thread.currentThread().isInterrupted()){ //且當前線程被中斷了
						tasksCancelledAtShutdown.add(command);
					}
				}
			}
		});
	}
}

處理非正常的線程終止:

  • 當一個線程因爲未捕獲異常而退出時, jvm會把這個事件報告給應用程序提供的UncaughtExceptionHandler異常處理器。若沒有提供任何異常處理器,則默認行爲是將棧追蹤信息輸出到System.err
/**
 * 將異常寫入日誌的UncaughtExceptionHandler
 */
public class UEHLogger implements UncaughtExceptionHandler {

	@Override
	public void uncaughtException(Thread t, Throwable e) {
		Logger logger = Logger.getAnonymousLogger();
		logger.log(Level.SEVERE, "the thread with exceptoin: "+t.getName(), e);
	}
}
  • 在運行時間較長的應用程序中,一般會爲全部線程的未捕獲異常制定同一個異常處理器,而且該處理器至少會將異常信息記錄到日誌中。

JVM關閉:

關閉鉤子:

  • 關閉鉤子:經過Runtime.addShutdownHook註冊的但還沒有開始的線程。
  • jvm不保證關閉鉤子的調用順序
  • 強制關閉jvm時,不會運行關閉鉤子。
  • 最後對全部服務使用同一個關閉鉤子,防止多個鉤子之間的出現共享資源競爭

守護線程:

  • 守護線程(Daemon Thread):執行一些輔助工做,不會阻礙JVM的關閉
  • 線程分爲普通線程守護線程。jvm啓動時建立的全部線程中,除了主線程,其餘線程都是守護線程(例如垃圾回收器以及其餘執行輔助工做的線程)。
  • 新建立的線程,默認會繼承建立它的線程的守護狀態,因此默認時,主線程建立的全部線程都是普通線程
  • 普通線程與守護線程之間的差別:當一個線程退出時,jvm會檢查其餘正在運行的線程,若是這些線程是守護線程,那麼jvm會正常退出操做。當jvm中止時,全部仍然存在的守護線程都會被拋棄-不執行finally塊不執行回捲棧,而直接退出。全部儘量少用守護線程,特別是包含一些I/O操做的任務。
  • 守護線程一般不能用來替代應用程序管理程序中各個服務的生命週期

終結器:

  • 避免使用終結器

不吝指正。

相關文章
相關標籤/搜索