java-CompletionService使用小記

因爲項目性能需求,以前單線程阻塞查詢雲端數據效率太慢,須要改進。百度了下CompletionService仍是蠻符合需求的。java

package com.demo.thread.executor;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExecuteMain {
	
	//線程池大小
	private static final int THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors();
	//最大線程
	private static final int THREAD_MAXIMUM_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2 + 1;

	public static void main(String[] args) {
		try {
			ExecuteMain main = new ExecuteMain();
			main.execService();
		} catch(InterruptedException e) {
			System.out.println("進入異常");
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	//執行線程
	public void execService() throws Exception {
		ExecutorService executorService = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_MAXIMUM_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(THREAD_MAXIMUM_POOL_SIZE ), new RejectedExecutionHandler() {
			
			@Override
			public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
				try {
                    //改成阻塞方法
					executor.getQueue().put(r);
				} catch (Exception e) {
					
				}
			}
		});
		CompletionService<Integer> completionService  = new ExecutorCompletionService<Integer>(executorService);
		try {
			List<Integer> accountIdList = genAccountId();
			for (Integer accountId : accountIdList) {
                final finalAccountId = accountId;
				completionService.submit(buildTask(accountId));
			}
			
			int resultNum = 0;
			for (int i = 0; i < accountIdList.size(); i++) {
				resultNum = completionService.take().get();
				 System.out.println("第" +(i + 1)+"個執行結果" + resultNum);  
			}
			 System.out.println("所有任務執行完成");
		} catch(InterruptedException e) {
			e.printStackTrace();
		} catch (Exception e) {
			executorService.shutdownNow(); 
			throw e;
		} finally {
			executorService.shutdownNow(); 
		}
	}
	
	//生成任務
	public Callable<Integer> buildTask(final int accountId) throws InterruptedException {
		final Random rand = new Random(); 
		Callable<Integer> task = new Callable<Integer>() {
			@Override
			public Integer call() throws Exception {
				System.out.println("執行任務" + Thread.currentThread().getName());
				int time = rand.nextInt(100)*100;
				Thread.currentThread().sleep(time);
				if(accountId > 10) {
					throw new InterruptedException("throws Exception");
				}
				return accountId;
			}
		};
		return task;
	}
	
	//生成帳號
	public List<Integer> genAccountId() {
		List<Integer> accountIdList = new ArrayList<Integer>();
		accountIdList.add(3);
		accountIdList.add(1);
		accountIdList.add(2);
		accountIdList.add(4);
		accountIdList.add(5);
		accountIdList.add(6);
		accountIdList.add(7);
		accountIdList.add(8);
		accountIdList.add(9);
		accountIdList.add(10);
		accountIdList.add(11);
		accountIdList.add(12);
		accountIdList.add(13);
		accountIdList.add(14);
		accountIdList.add(15);
		accountIdList.add(16);
		accountIdList.add(17);
		accountIdList.add(18);
		return accountIdList;
	}
}

demo如上,結果以下dom

 

關於CompletionService的原理,能夠先百度瞭解下。這裏寫下注意事項ide

1.completionService.take().get(); 這裏是沒有順序的,若是程序執行時對有順序有要求,須要謹慎性能

2.當callable中出現異常,外層方法必須調用completionService.take().get();這句才能捕獲隊列的受檢查異常。ui

3.若是你callable中的方法A,又調用了另一個方法B,並且B方法存在回調行爲(好比說查詢出結果後還會作其餘處理),這時不要使用shutdownNow()進行關閉。不然可能會出現,雲端任務執行狀態和本地狀態不匹配的狀況,由於當出現異常時,你調用shutdownNow()後線程結束了,回調時本地狀態未更新。線程

4.線程任務中的參數必定要用final修飾,否則可能會出現入參和結果不匹配狀況code

相關文章
相關標籤/搜索