1. 用戶請求取消。 java
2. 有時間限制的操做,如超時設定。 web
3. 應用程序事件。 安全
4. 錯誤。 異步
5. 關閉。 jvm
以下面這種取消操做實現: socket
/** * 一個可取消的素數生成器 * 使用volatile類型的域保存取消狀態 * 經過循環來檢測任務是否取消 */ @ThreadSafe public class PrimeGenerator implements Runnable { private final List<BigInteger> primes = new ArrayList<>(); private volatile boolean canceled; @Override public void run() { BigInteger p = BigInteger.ONE; while (!canceled){ p = p.nextProbablePrime(); synchronized (this) { //同步添加素數 primes.add(p); } } } /** * 取消生成素數 */ public void cancel(){ canceled = true; } /** * 同步獲取素數 * @return 已經生成的素數 */ public synchronized List<BigInteger> get(){ return new ArrayList<>(primes); } }
其測試用例: ide
public class PrimeGeneratorTest { public static void main(String[] args) { PrimeGenerator pg = new PrimeGenerator(); new Thread(pg).start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally{ pg.cancel(); //始終取消 } System.out.println("all primes: " + pg.get()); } }
下面經過中斷實現取消功能: 測試
/** * 經過中斷來實現取消 * 不採用boolean變量, * 防止在queue.put()時因爲阻塞,不能檢查到boolean變量而沒法取消 * 但使用interrupt就能夠, * 即便queue.put()阻塞, 也會檢查到interrupt信號,從而拋出IntteruptedException * 從而達到取消的目的 */ public class PrimeProducer extends Thread { private final BlockingQueue<BigInteger> queue; public PrimeProducer(BlockingQueue<BigInteger> queue){ this.queue = queue; } @Override public void run() { try { BigInteger p = BigInteger.ONE; while (!Thread.currentThread().isInterrupted()){ queue.put(p = p.nextProbablePrime()); } } catch (InterruptedException e) { // thread exit } } /** * 取消 */ public void cancel(){ interrupt(); //中斷當前線程 } }
1. 傳遞異常。 this
2. 恢復中斷狀態,從而事調用棧的上層代碼可以對其進行處理。 spa
public void timedRun(Runnable r, long timeout, TimeUnit unit) throws InterruptedException { Future<?> task = taskExec.submit(r); try { task.get(timeout, unit); } catch (ExecutionException e) { //任務執行中拋出異常 } catch (TimeoutException e) { //任務超時處理 } finally{ //任務執行完畢,沒有影響; 任務執行中會中斷任務 if (task != null) task.cancel(true); } }
1. java.io包中的同步Socket I/O。如套接字中進行讀寫操做read, write方法。
2. java.io包中的同步I/O。如當中斷或關閉正在InterruptibleChannel上等待的線程時,會對應拋出ClosedByInterruptException或 AsynchronousCloseException。
3. Selector的異步I/O。若是一個線程在調用Selector.select時阻塞了,那麼調用close, wakeup會使線程拋出ClosedSelectorException。
4. 獲取某個鎖。當一個線程等待某個鎖而阻塞時,不會響應中斷。但Lock類的lockInterruptibly容許在等待鎖時響應中斷。
/** * 經過改寫interrupt方法將非標準的取消操做封裝在Thread中 */ public class ReaderThread extends Thread { private final Socket socket; private final InputStream in; private int bufferSize; public ReaderThread(Socket socket, InputStream in) { this(socket, in, 1024); } public ReaderThread(Socket socket, InputStream in, int bufferSize) { this.socket = socket; this.in = in; this.bufferSize = bufferSize; } @Override public void interrupt() { try { socket.close(); //中斷前關閉socket } catch (IOException e) { } finally{ super.interrupt(); } } @Override public void run() { try { byte[] buf = new byte[bufferSize]; while (true) { int count = in.read(buf); if (count < 0) { break; } else if (count > 0) { processBuffer(buf, count); } } } catch (IOException e) { // 線程中斷處理 } } ... }
/** * 可取消的任務接口 */ public interface CancellableTask<T> extends Callable<T> { void cancel(); RunnableFuture<T> newTask(); } /** * 使用了Socket的任務 * 在取消時須要關閉Socket */ public abstract class SocketUsingTask<T> implements CancellableTask<T> { private Socket socket; public void setSocket(Socket socket) { this.socket = socket; } @Override public T call() throws Exception { //do working ... } @Override public synchronized void cancel() { try { if (socket != null){ socket.close(); } } catch (IOException ignored) { } } @Override public RunnableFuture<T> newTask() { return new FutureTask<T>(this){ @Override public boolean cancel(boolean mayInterruptIfRunning) { try { SocketUsingTask.this.cancel(); } catch (Exception ignored) { } return super.cancel(mayInterruptIfRunning); } }; } } /** * 經過newTaskFor將非標準的取消操做封裝在任務中 */ public class CancellingExecutor extends ThreadPoolExecutor { public CancellingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { if (callable instanceof CancellableTask){ //如果咱們定製的可取消任務 return ((CancellableTask<T>)callable).newTask(); } return super.newTaskFor(callable); } }
/** * 不支持關閉的生產者-消費者日誌服務 */ public class LogWriter { private final BlockingQueue<String> queue; private final LoggerThread logger; public LogWriter(Writer writer){ this.queue = new LinkedBlockingDeque<String>(); this.logger = new LoggerThread(writer); } public void start(){ logger.start(); } public void log(String msg) throws InterruptedException{ queue.put(msg); } private class LoggerThread extends Thread{ private final Writer writer; public LoggerThread(Writer writer) { this.writer = writer; } @Override public void run() { try { while(true){ writer.write(queue.take()); } } catch (IOException e) { // io exception handle } catch (InterruptedException e) { // interrupt exceptino handle } finally{ try { writer.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
/** * 爲LoggerWriter添加可靠的取消操做 */ public class LogService { private final BlockingQueue<String> queue; private final LoggerThread logger; private final PrintWriter writer; private boolean isShutdown; //用於終止生產者 private int reservations; //隊列中的消息數 public LogService(PrintWriter writer){ this.queue = new LinkedBlockingDeque<String>(); this.logger = new LoggerThread(); this.writer = writer; } /** * 產生日誌 * @param msg 日誌內容 * @throws InterruptedException */ public void log(String msg) throws InterruptedException{ synchronized (this) { if (isShutdown){ throw new IllegalStateException("can't log, service has stopped."); } ++reservations; } queue.put(msg); } /** * 啓動日誌香妃 */ public void start(){ logger.start(); } /** * 中止日誌服務 */ public void stop(){ synchronized(this){ isShutdown = true; } logger.interrupt(); //中斷日誌線程 } /** * 消費日誌線程 */ private class LoggerThread extends Thread{ @Override public void run() { try { while(true){ try { synchronized (LogService.this) { if (isShutdown) break; } String msg = queue.take(); synchronized (LogService.this) { --reservations; } writer.println(msg); } catch (InterruptedException e) { // retry } } }finally{ writer.close(); } } } }
1. shutdown: 安全關閉。再也不接受新任務提交,待全部隊列中的任務執行完成再關閉。
2. shutdownNow: 強行關閉。再也不接受新任務提交,中止正在執行的任務,並返回未開始執行的任務列表。
/** * 封裝ExecutorService實現日誌服務 */ public class LogService2 { private final ExecutorService exec = Executors.newSingleThreadExecutor(); private final PrintWriter writer; public LogService2(PrintWriter writer){ this.writer = writer; } /** * 產生日誌 * @param msg 日誌內容 * @throws InterruptedException */ public void log(String msg) throws InterruptedException{ exec.execute(new WriteTask(msg)); } /** * 中止日誌服務 * @throws InterruptedException */ public void stop(long timeout, TimeUnit unit) throws InterruptedException{ try { exec.shutdown(); //平緩關閉服務 //關閉服務後, 阻塞到全部任務被執行完畢或者超時發生,或當前線程被中斷 exec.awaitTermination(timeout, unit); } finally{ writer.close(); } } ... }
經過毒丸對象來關閉服務:
/** * 索引服務 * 經過一個毒丸對象來關閉服務 */ public class IndexingService { private static final File POISON = new File(""); //毒丸對象 private final IndexerThread consumer = new IndexerThread(); //消費者 private final CrawlerThread producer = new CrawlerThread(); //生產者 private final BlockingQueue<File> queue = new LinkedBlockingDeque<File>(); private final File root; public IndexingService(File root) { this.root = root; } /** * 啓動索引服務 */ public void start(){ producer.start(); consumer.start(); } public void stop(){ producer.interrupt(); //中斷爬蟲線程 } public void awaitTermination() throws InterruptedException{ consumer.join(); //等待消費者線程結束 } /** * 爬蟲線程 */ private class CrawlerThread extends Thread{ @Override public void run() { try { crawl(root); } catch (InterruptedException e) { // handle the exception } try { while(true){ queue.put(POISON); break; } } catch (InterruptedException e) { // retry } } private void crawl(File root) throws InterruptedException{ // crawl from web } } /** * 創建索引的線程 */ private class IndexerThread extends Thread{ @Override public void run() { try { while (true){ File file = queue.take(); if (file == POISON){ //如果毒丸對象 break; } else{ indexFile(file); //創建索引文件 } } } catch (InterruptedException e) { // handle exception } } private void indexFile(File file) { } } }
/** * 在ExecutorService中跟蹤在關閉以後被取消的任務 */ public class TrackingExecutor extends AbstractExecutorService { private final ExecutorService exec; private final Set<Runnable> tasksCancelledAtShutdown = Collections.synchronizedSet(new HashSet<Runnable>()); public TrackingExecutor(ExecutorService exec) { this.exec = exec; } /** * 獲取關閉後取消的任務 */ public List<Runnable> getCancelledTasks(){ if (!exec.isTerminated()){ throw new IllegalStateException("service doesn't stop"); } return new ArrayList<>(tasksCancelledAtShutdown); } @Override public void execute(final Runnable command) { exec.execute(new Runnable() { @Override public void run() { try { command.run(); } finally{ //有可能出現誤報: 任務執行完畢了, 線程池 if (isShutdown() && //若Executor已經關閉了 Thread.currentThread().isInterrupted()){ //且當前線程被中斷了 tasksCancelledAtShutdown.add(command); } } } }); } }
/** * 將異常寫入日誌的UncaughtExceptionHandler */ public class UEHLogger implements UncaughtExceptionHandler { @Override public void uncaughtException(Thread t, Throwable e) { Logger logger = Logger.getAnonymousLogger(); logger.log(Level.SEVERE, "the thread with exceptoin: "+t.getName(), e); } }
終結器:
不吝指正。