/** * 串行處理請求: * 簡單正確,但性能低下 */ public class SingleThreadWebServer { public static void main(String[] args) throws IOException { ServerSocket server = new ServerSocket(80); boolean listening = true; while (listening){ Socket connection = server.accept(); //阻塞等待客戶端鏈接請求 handlerRequest(connection); } server.close(); } ... }
/** * 爲每個用戶請求建立一個線程爲其服務 */ public class ThreadPerTaskWebServer { public static void main(String[] args) throws IOException { ServerSocket server = new ServerSocket(80); boolean listening = true; while (listening){ final Socket connection = server.accept(); //阻塞等待客戶端鏈接請求 Runnable task = new Runnable() { @Override public void run() { handlerRequest(connection); } }; new Thread(task).start(); } server.close(); } ... }上面的實現至少能給咱們一些暗示:
/** * 基於線程池的Web服務器 */ public class ThreadPerTaskWebServer { private static final int NTHREADS = 100; /** * 建立固定線程數量的線程池 */ private static final Executor exec = Executors.newFixedThreadPool(NTHREADS); public static void main(String[] args) throws IOException { ServerSocket server = new ServerSocket(80); boolean listening = true; while (listening){ final Socket connection = server.accept(); //阻塞等待客戶端鏈接請求 Runnable task = new Runnable() { @Override public void run() { handlerRequest(connection); } }; exec.execute(task); } server.close(); } ... }
執行策略須要考慮的有: java
Executors提供了幾種建立線程池的方法: 安全
//建立固定長度的線程池,每當提交一個任務時就建立一個線程,直到達到線程池的最大數量,如有線程發生異常,則會從新建立 public static ExecutorService newFixedThreadPool(int nThreads) {...} //建立單個線程來執行任務,若該線程發生異常,會建立一個新的線程。該池可按順序執行隊列中的任務(如FIFO,LIFO,優先級等) public static ExecutorService newSingleThreadExecutor() {...} //該線程池無長度限制,在線程過多時會回收,過少時會建立 public static ExecutorService newCachedThreadPool() {...} //建立一個固定長度的線程池,並以延遲或定時的方式執行任務 public static ScheduledExecutorService newScheduledThreadPool(...}
public interface ExecutorService extends Executor { void shutdown();//平緩關閉,不接受新任務,待提交的任務執行完畢後,再關閉 List<Runnable> shutdownNow();//粗暴關閉,嘗試取消全部執行中的任務,再也不啓動隊列中還沒有開始執行的任務 boolean isShutdown(); //是否已關閉 boolean isTerminated(); //是否已終止 boolean awaitTermination(long timeout, TimeUnit unit)//等待ExecutorService到達終止狀態 throws InterruptedException; ... }
/** * 對線程池進行生命週期管理 */ public class LifecycleWebServer { private static final int NTHREADS = 100; private final ServerSocket server; public LifecycleWebServer() throws IOException{ server = new ServerSocket(80); } /** * 建立固定線程數量的線程池 */ private static final ExecutorService exec = Executors.newFixedThreadPool(NTHREADS); public void start() throws IOException{ while (!exec.isShutdown()){ try { final Socket connection = server.accept(); //阻塞等待客戶端鏈接請求 Runnable task = new Runnable() { @Override public void run() { handlerRequest(connection); } }; exec.execute(task); } catch (RejectedExecutionException e) { if (!exec.isShutdown()){ //task submission is rejected } } } } public void stap() throws IOException{ exec.shutdown(); //平緩關閉線程池 server.close(); } private static void handlerRequest(Socket connection) { // handle request } }
/** * 錯誤的Timer行爲,Timer是脆弱的 */ public class OutOfTime { public static void main(String[] args) throws InterruptedException { Timer timer = new Timer(); timer.schedule(new ThrowTask(), 1); //第一個任務拋出異常 Thread.sleep(1000); timer.schedule(new ThrowTask(), 1); //第二個任務將不能再執行, 並拋出異常Timer already cancelled. Thread.sleep(5000); System.out.println("end."); } static class ThrowTask extends TimerTask{ @Override public void run() { throw new RuntimeException("test timer's error behaviour"); } } }
/** * 串行地渲染頁面元素, 性能很低下 * 下載圖片過程當中有可能IO時間長阻塞, * CPU沒能有效利用 */ public class SingleThreadRenderer { void rendererPage(CharSequence source){ renderText(source); List<ImageData> imageDatas = new ArrayList<>(); //解析文本中的圖片鏈接 for (ImageInfo imageInfo : scanForImage(source)){ imageDatas.add(imageInfo.downloadImageData()); //下載圖片 } //渲染圖片 for (ImageData data : imageDatas){ renderImage(data); } } ... }
/** * 使用Future等待圖像下載 * 將渲染過程分爲: * IO密集型(下載圖像) * CPU密集型(渲染頁面) * 但這裏仍然必須圖片下載完成了才能看到頁面,只是縮短了總時間 */ public class FutureRenderer { private final ExecutorService exec = Executors.newFixedThreadPool(10); void rendererPage(CharSequence source){ final List<ImageInfo> imageInfos = scanForImage(source); //抽出圖片連接信息 Callable<List<ImageData>> task = new Callable<List<ImageData>>() { @Override public List<ImageData> call() throws Exception { List<ImageData> result = new ArrayList<>(); for (ImageInfo imageInfo : imageInfos){ result.add(imageInfo.downloadImageData()); //下載圖片 } return result; } }; Future<List<ImageData>> future = exec.submit(task); //提交下載圖片的任務 renderText(source); //渲染文本 try { List<ImageData> imageDatas = future.get();//阻塞獲取下載的圖片 for (ImageData data : imageDatas){ //渲染圖片 renderImage(data); } } catch (InterruptedException e) { //從新設置線程的中斷狀態 Thread.currentThread().interrupt(); future.cancel(true); } catch (ExecutionException e) { // handle exception } } ... }
/** * 使用CompletionService, 使頁面元素在下載完成後當即顯示出來 * 相似Mobile中的新聞加載,圖片時被異步加載的 */ public class Renderer { private final ExecutorService executor; public Renderer(ExecutorService executor){ this.executor = executor; } void rendererPage(CharSequence source){ final List<ImageInfo> imageInfos = scanForImage(source); //抽出圖片連接信息 CompletionService<ImageData> completionService = new ExecutorCompletionService<>(this.executor); for (final ImageInfo imageInfo : imageInfos){ //提交下載圖片的任務, 每下載一個圖片就是一個任務,達到下載圖片並行性 completionService.submit(new Callable<ImageData>() { //內部會將執行完後封裝的Future對象放到一個BlockingQueue中 @Override public ImageData call() throws Exception { return imageInfo.downloadImageData(); } }); } renderText(source); //渲染文本 try { for (int i=0, n=imageInfos.size(); i<n; i++){ Future<ImageData> f = completionService.take(); ImageData imageData = f.get(); renderImage(imageData); //渲染圖片 } } catch (InterruptedException e) { //從新設置線程的中斷狀態 Thread.currentThread().interrupt(); } catch (ExecutionException e) { // handle exception e.getCause() } } ... }
任務超時設置可經過Future的get超時版本: 服務器
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;你能夠捕獲其TimeoutException來作相應處理便可。
能夠經過ExecutorService提交一組任務: 多線程
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;不吝指正。