java併發編程(五): 任務執行

任務執行:

  • 大多數併發應用程序都是圍繞"任務執行"來構造的:任務一般是一些抽象的離散的工做單元。

在線程中執行任務:

  • 理想狀況下,各個任務之間是相互獨立的:任務並不依賴其餘任務的狀態結果邊界效應

串行地執行任務:

/**
 * 串行處理請求:
 * 簡單正確,但性能低下
 */
public class SingleThreadWebServer {
	public static void main(String[] args) throws IOException {
		ServerSocket server = new ServerSocket(80);
		boolean listening = true;
		while (listening){
			Socket connection = server.accept(); //阻塞等待客戶端鏈接請求
			handlerRequest(connection);
		}
		server.close();
	}
       ...
}

顯示地爲任務建立線程:

/**
 * 爲每個用戶請求建立一個線程爲其服務
 */
public class ThreadPerTaskWebServer {
	public static void main(String[] args) throws IOException {
		ServerSocket server = new ServerSocket(80);
		boolean listening = true;
		while (listening){
			final Socket connection = server.accept(); //阻塞等待客戶端鏈接請求
			Runnable task = new Runnable() {
				@Override
				public void run() {
					handlerRequest(connection);
				}
			};
			new Thread(task).start();
		}
		server.close();
	}
       ...
}
上面的實現至少能給咱們一些暗示:
  • 任務處理過程從主線程分離出來,以提升主線程響應其餘請求的能力。
  • 任務能夠並行處理,提升吞吐量。
  • 任務處理代碼必須線程安全

但這樣無限制建立會帶來不足:

  • 線程生命週期的開銷很是高。線程的建立和銷燬都是有代價的,不一樣平臺開銷也不一樣。
  • 資源消耗。太多線程會消耗系統資源,如空閒線程的內存佔用,大量線程競爭CPU時產生其餘性能開銷等。
  • 穩定性。可建立線程數會受到限制,如jvm啓動參數(如-Xss等),Thread構造函數請求的棧大小,以及底層操做系統對線程的限制(32位機器上,主要限制因素爲線程棧的尋址空間)等,破壞這些限制,極可能拋出OutOfMemoryError異常。

Executor框架:

  • 任務是一組邏輯工做單元,而線程則是使任務異步執行的機制
  • java類庫中,任務執行的抽象不是Thread, 而是Executor
/**
 * 基於線程池的Web服務器
 */
public class ThreadPerTaskWebServer {
	private static final int NTHREADS = 100;
	/**
	 * 建立固定線程數量的線程池
	 */
	private static final Executor exec = 
			Executors.newFixedThreadPool(NTHREADS);
	public static void main(String[] args) throws IOException {
		ServerSocket server = new ServerSocket(80);
		boolean listening = true;
		while (listening){
			final Socket connection = server.accept(); //阻塞等待客戶端鏈接請求
			Runnable task = new Runnable() {
				@Override
				public void run() {
					handlerRequest(connection);
				}
			};
			exec.execute(task);
		}
		server.close();
	}
       ...
}

執行策略:

執行策略須要考慮的有: java

  • 什麼線程中執行任務。
  • 任務按照什麼順序執行(FIFO, LIFO, 優先級)。
  • 多少個任務能併發執行。
  • 在隊列中有多少個任務在等待執行。
  • 因爲過載,系統應如何拒絕任務?若是通知任務被拒絕
  • 執行任務先後,應該作什麼?

線程池:

  • 線程池:管理一組同構工做線程的資源池。

Executors提供了幾種建立線程池的方法: 安全

//建立固定長度的線程池,每當提交一個任務時就建立一個線程,直到達到線程池的最大數量,如有線程發生異常,則會從新建立
public static ExecutorService newFixedThreadPool(int nThreads) {...}
//建立單個線程來執行任務,若該線程發生異常,會建立一個新的線程。該池可按順序執行隊列中的任務(如FIFO,LIFO,優先級等)
public static ExecutorService newSingleThreadExecutor() {...}
//該線程池無長度限制,在線程過多時會回收,過少時會建立
public static ExecutorService newCachedThreadPool() {...}
//建立一個固定長度的線程池,並以延遲或定時的方式執行任務
public static ScheduledExecutorService newScheduledThreadPool(...}

Executor的生命週期:

  • 咱們能夠經過ExecutorService來對線程池進行生命週期的管理:
public interface ExecutorService extends Executor {

    void shutdown();//平緩關閉,不接受新任務,待提交的任務執行完畢後,再關閉
    List<Runnable> shutdownNow();//粗暴關閉,嘗試取消全部執行中的任務,再也不啓動隊列中還沒有開始執行的任務
    boolean isShutdown(); //是否已關閉
    boolean isTerminated(); //是否已終止
    boolean awaitTermination(long timeout, TimeUnit unit)//等待ExecutorService到達終止狀態
        throws InterruptedException;
    ...
}
  • ExecutorService生命週期狀態:運行關閉已終止
/**
 * 對線程池進行生命週期管理
 */
public class LifecycleWebServer {
	private static final int NTHREADS = 100;
	private final ServerSocket server;
	
	public LifecycleWebServer() throws IOException{
		server = new ServerSocket(80);
	}
	
	/**
	 * 建立固定線程數量的線程池
	 */
	private static final ExecutorService exec = 
			Executors.newFixedThreadPool(NTHREADS);
	
	public void start() throws IOException{
		while (!exec.isShutdown()){
			try {
				final Socket connection = server.accept(); //阻塞等待客戶端鏈接請求
				Runnable task = new Runnable() {
					@Override
					public void run() {
						handlerRequest(connection);
					}
				};
				exec.execute(task);
			} catch (RejectedExecutionException e) {
				if (!exec.isShutdown()){
					//task submission is rejected
				}
			}
		}
	}
	
	public void stap() throws IOException{
		exec.shutdown(); //平緩關閉線程池
		server.close();
	}

	private static void handlerRequest(Socket connection) {
		// handle request
	}
}

延遲任務與週期任務:

  • 建議經過ScheduledThreadPoolExecutor來代替Timer,TimerTask
  • Timer基於絕對時間,ScheduledThreadPoolExecutor基於相對時間
  • Timer執行全部定時任務只能建立一個線程,若某個任務執行時間過長,容易破壞其餘TimerTask的定時精確性
  • 已經調度但未執行的TimerTask將不會再執行,新的任務也不會被調度,出現"線程泄漏",如:
/**
 * 錯誤的Timer行爲,Timer是脆弱的
 */
public class OutOfTime {
	public static void main(String[] args) throws InterruptedException {
		Timer timer = new Timer();
		timer.schedule(new ThrowTask(), 1); //第一個任務拋出異常
		Thread.sleep(1000); 
		timer.schedule(new ThrowTask(), 1); //第二個任務將不能再執行, 並拋出異常Timer already cancelled.
		Thread.sleep(5000);
		System.out.println("end.");
	}
	
	static class ThrowTask extends TimerTask{

		@Override
		public void run() {
			throw new RuntimeException("test timer's error behaviour");
		}
	}
}

串行的頁面渲染器:

/**
 * 串行地渲染頁面元素, 性能很低下
 * 下載圖片過程當中有可能IO時間長阻塞,
 * CPU沒能有效利用
 */
public class SingleThreadRenderer {
	void rendererPage(CharSequence source){
		renderText(source);
		List<ImageData> imageDatas = new ArrayList<>();
		//解析文本中的圖片鏈接
		for (ImageInfo imageInfo : scanForImage(source)){ 
			imageDatas.add(imageInfo.downloadImageData()); //下載圖片
		}
		//渲染圖片
		for (ImageData data : imageDatas){
			renderImage(data);
		}
	}
       ...
}

攜帶結果的任務Callable與Future:

  • Executor執行任務的4個生命週期:建立提交開始完成
  • Executor框架中,能夠取消已提交但未開始執行的任務,對於已經開始執行的任務,只能當他們能響應中斷時,才能取消,取消已經完成的任務不會有影響。
/**
 * 使用Future等待圖像下載
 * 將渲染過程分爲:
 *     IO密集型(下載圖像)
 *     CPU密集型(渲染頁面)
 * 但這裏仍然必須圖片下載完成了才能看到頁面,只是縮短了總時間
 */
public class FutureRenderer {
	private final ExecutorService exec = Executors.newFixedThreadPool(10);
	
	void rendererPage(CharSequence source){
		final List<ImageInfo> imageInfos = scanForImage(source); //抽出圖片連接信息
		Callable<List<ImageData>> task = 
				new Callable<List<ImageData>>() {
					@Override
					public List<ImageData> call() throws Exception {
						List<ImageData> result = 
								new ArrayList<>();
						for (ImageInfo imageInfo : imageInfos){
							result.add(imageInfo.downloadImageData()); //下載圖片
						}
						return result;
					}
				};
		Future<List<ImageData>> future = exec.submit(task); //提交下載圖片的任務
		renderText(source); //渲染文本
		try {
			List<ImageData> imageDatas = future.get();//阻塞獲取下載的圖片
			for (ImageData data : imageDatas){ //渲染圖片
				renderImage(data); 
			}
		} catch (InterruptedException e) {
			//從新設置線程的中斷狀態
			Thread.currentThread().interrupt();
			future.cancel(true);
		} catch (ExecutionException e) {
			// handle exception
		} 
	}
        ...
}

異構任務並行化中存在的侷限:

  • 當異構任務之間的執行效率懸殊很大時,對於總體的性能提高來看並非頗有效。好比要是上面下載圖片過程時間比渲染文本時間長不少,那麼總體併發提高並非很明顯。

CompletionService: Executor與BlockingQueue:

/**
 * 使用CompletionService, 使頁面元素在下載完成後當即顯示出來
 * 相似Mobile中的新聞加載,圖片時被異步加載的
 */
public class Renderer {
	private final ExecutorService executor;
	
	public Renderer(ExecutorService executor){
		this.executor = executor;
	}
	
	void rendererPage(CharSequence source){
		final List<ImageInfo> imageInfos = scanForImage(source); //抽出圖片連接信息
		CompletionService<ImageData> completionService
					= new ExecutorCompletionService<>(this.executor);
		for (final ImageInfo imageInfo : imageInfos){
			//提交下載圖片的任務, 每下載一個圖片就是一個任務,達到下載圖片並行性
			completionService.submit(new Callable<ImageData>() { //內部會將執行完後封裝的Future對象放到一個BlockingQueue中
				@Override
				public ImageData call() throws Exception {
					return imageInfo.downloadImageData();
				}
			});
		}
		renderText(source); //渲染文本
		try {
			for (int i=0, n=imageInfos.size(); i<n; i++){
				Future<ImageData> f = completionService.take();
				ImageData imageData = f.get();
				renderImage(imageData); //渲染圖片
			}
		} catch (InterruptedException e) {
			//從新設置線程的中斷狀態
			Thread.currentThread().interrupt();
		} catch (ExecutionException e) {
			// handle exception e.getCause()
		} 
	}
       ...
}

爲任務設置超時:

任務超時設置可經過Future的get超時版本: 服務器

V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
你能夠捕獲其TimeoutException來作相應處理便可。

提交多個任務:

能夠經過ExecutorService提交一組任務: 多線程

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit) throws InterruptedException;
不吝指正。
相關文章
相關標籤/搜索