java併發編程(七): 線程池的使用

線程池的使用:

  • 線程池分配調優,使用注意事項

在任務與執行策略之間的隱性耦合:

  • 有些任務須要明確指定執行策略

       1. 依賴性任務。提交的任務須要依賴其餘任務,此時須要當心維護這些執行策略以免產生活躍性問題html

       2. 使用線程封閉機制的任務。任務要求其執行所在的Executor是線程安全的java

       3. 對響應時間敏感的任務。 node

       4. 使用ThreadLocal的任務。ThreadLocal使每一個線程均可以擁有某個變量的一個私有"版本"算法

  • 只有當任務都是同類型的且相互獨立時,線程池的性能才能達到最佳

線程飢餓死鎖:

  • 在線程池中,若是全部正在執行任務的線程都因爲等待其餘仍處於工做隊列中的任務而阻塞,這種現象稱爲線程飢餓死鎖
/**
 * 在單線程Executor中任務發生死鎖 
 */
public class ThreadDeadLock {
	ExecutorService exec = Executors.newSingleThreadExecutor();
	
	public class RenderPageTask implements Callable<String>{
		@Override
		public String call() throws Exception {
			Future<String> header, footer; //頁眉, 頁腳
			header = exec.submit(new LoadFileTask("header.html"));
			footer = exec.submit(new LoadFileTask("footer.html"));
			String body = renderBody();
			//有可能發生死鎖---任務等待子任務完成
			return header.get() + body + footer.get();
		}
                ...
	}
}
  • 每當提交了一個有依賴性的Executor任務時,要清楚地知道可能會出現線程"飢餓"死鎖,所以須要在代碼或配置Executor地配置文件中記錄線程池地大小限制或配置限制。

運行時間較長的任務:

  • 避免等待運行時間較長的任務而阻塞過長時間,能夠使用阻塞方法的超時版本,如Thread.join, BlockingQueue.put, CutDownLatch.await, Selector.select等。

設置線程池的大小:

  • 線程池的理想大小取決於被提交任務的類型及所部署系統的特性
  • 可根據計算任務類型進行線程池大小:如CPU密集型則可採用Runtime.avaliableProcesses()+1個線程;對於I/O密集型,因爲阻塞操做多,可以使用更多的線程,如2倍cpu核數。

配置ThreadPoolExecutor:

public ThreadPoolExecutor(int corePoolSize, 
                          int maximumPoolSize, 
                          long keepAliveTime,
                          TimeUnit unit, 
                          BlockingQueue<Runnable> workQueue, 
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {

線程的建立與銷燬:

  • CorePoolSize: 線程池基本大小,即線程池的目標大小,即在沒有任務執行時線程池的大小,而且只有在工做隊列滿了的狀況下纔會建立超出這個數量的線程
  • MaxmumPooSize: 線程池最大大小表示可同時活動的線程數量的上限。若某個線程的空閒時間超過了keepAliveTime, 則被標記爲可回收的,而且當前線程池的大小大於基本大小,這個線程將被終止。
  • newFixedThreadPool: CorePoolSize = MaxmumPoolSize。
  • newCachedThreadPool: CorePoolSize=0, MaxmumPoolSize=Integer.MAX_VALUE,比較適合執行短期任務
  • newSingleThreadPool: CorePoolSize=MaxmumPoolSize=1,其不可被重配置
  • newScheduledThreadPool: 只能設置CorePoolSize。內部實現不一樣於其餘的ThreadPoolExecutor, 而是SchedduleThreadPoolExecutor。可執行定時任務或者隔時任務

管理隊列任務:

  • 對於Executor, newCachedThreadPool工廠方法時一種很好的默認選擇,它能提供比固定大小的線程池更好的排隊性能。當須要限制當前任務的數量以知足資源管理需求時,那麼能夠選擇固定大小的線程池,就像接受網絡客戶請求的服務應用程序中,若是不進行限制,那麼很容易發生過載問題。
  • 只有當任務相互獨立時,爲線程池或工做隊列設置界限纔是合理的。若是任務之間存在依賴性,那麼有界的線程池或隊列可能致使線程"飢餓"死鎖問題。此時應該使用無界的線程池,如newCachedThreadPool

飽和策略:

  • 當有界隊列被填滿後,飽和策略開始發揮做用。
  • jdk提供的幾種飽和策略

      1. AbortPolicy(停止策略),默認的飽和策略。會拋出RejectedExecutionException異常。 安全

      2. DiscardPolicy(拋棄策略): 會拋棄該任務。 網絡

      3. DiscardOldestPolicy:會拋棄下一個將被執行的任務,而後嘗試從新提交新的任務。最好不和優先級隊列一塊兒使用,由於它會拋棄優先級最高的任務。 併發

      4. CallerRunsPolicy(調用者運行策略):將任務回退給調用者。它不會在線程池的某個線程中執行新提交的任務,而是在一個調用execute的線程中執行該任務。 ide

/**
 * 建立一個固定大小的線程池,
 * 並採用有界隊列與"調用者運行"飽和策略
 */
public void intThreadPool() {
	ThreadPoolExecutor executor = 
			new ThreadPoolExecutor(N_THREADS, N_THREADS,
					0L, TimeUnit.MILLISECONDS, 
					new LinkedBlockingQueue<Runnable>(CAPACITY));
	executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
}
/**
 * 使用Semaphore來控制任務的提交速率
 */
public class BoundedExecutor {
	private final Executor exec;
	private final Semaphore semaphore;
	
	public BoundedExecutor(Executor exec, int bound) {
		this.exec = exec;
		this.semaphore = new Semaphore(bound);
	}
	
	public void submitTask(final Runnable command){
		try {
			semaphore.acquire(); //提交任務前請求信號量
			exec.execute(new Runnable() {
				@Override
				public void run() {
					try{
						command.run();
					} finally{
						semaphore.release(); //執行完釋放信號
					}
				}
			});
		} catch (InterruptedException e) {
			// handle exception
		}
	}
}

線程工廠:

  • 咱們能夠經過定製線程工廠,從而定製線程池中建立的線程,這樣能夠實現些擴展功能,如調試信息,設置UncaughtExceptionHandler等。
/**
 * 自定義的線程工廠
 */
public class MyThreadFactory implements ThreadFactory {
	private final String poolName;
	
	public MyThreadFactory(String poolName) {
		super();
		this.poolName = poolName;
	}

	@Override
	public Thread newThread(Runnable r) {
		return new MyAppThread(r);
	}
}

public class MyAppThread extends Thread {
	public static final String DEFAULT_NAME="MyAppThread";
	private static volatile boolean debugLifecycle = false;
	private static final AtomicInteger created = new AtomicInteger();
	private static final AtomicInteger alive = new AtomicInteger();
	private static final Logger log = Logger.getAnonymousLogger();
	
	public MyAppThread(Runnable r) {
		this(r, DEFAULT_NAME);
	}

	public MyAppThread(Runnable r, String name) {
		super(r, name+ "-" + created.incrementAndGet());
		setUncaughtExceptionHandler( //設置未捕獲的異常發生時的處理器
				new Thread.UncaughtExceptionHandler() {
					@Override
					public void uncaughtException(Thread t, Throwable e) {
						log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e);
					}
				});
	}

	@Override
	public void run() {
		boolean debug = debugLifecycle;
		if (debug) 
			log.log(Level.FINE, "running thread " + getName());
		try {
			alive.incrementAndGet();
			super.run();
		} finally {
			alive.decrementAndGet();
			if (debug) 
				log.log(Level.FINE, "existing thread " + getName());
		}
	}
}
  • 當應用須要利用安全策略來控制某些特殊代碼庫的訪問權,能夠利用PrivilegedThreadFactory來定製本身的線程工廠,以避免出現安全性異常。

在調用構造函數後再定製ThreadPoolExecutor:

  • 能夠在建立線程池後,再經過Setter方法設置其基本屬性。

擴展ThreadPoolExecutor:

/**
 * 增長日誌和記時等功能的線程池
 */
public class TimingThreadPoolExecutor extends ThreadPoolExecutor {
	private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();//任務執行開始時間
	private final Logger log = Logger.getAnonymousLogger();
	private final AtomicLong numTasks = new AtomicLong(); //統計任務數
	private final AtomicLong totalTime = new AtomicLong(); //線程池運行總時間

	public TimingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
			long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
	}

	@Override
	protected void beforeExecute(Thread t, Runnable r) {
		super.beforeExecute(t, r);
		log.fine(String.format("Thread %s: start %s", t, r));
		startTime.set(System.nanoTime());
	}

	@Override
	protected void afterExecute(Runnable r, Throwable t) {
		try{
			long endTime = System.nanoTime();
			long taskTime = endTime - startTime.get();
			numTasks.incrementAndGet();
			totalTime.addAndGet(taskTime);
			log.fine(String.format("Thread %s: end %s, time=%dns", t, r, taskTime));
		} finally{
			super.afterExecute(r, t);
		}
	}

	@Override
	protected void terminated() {
		try{
			//任務執行平均時間
			log.info(String.format("Terminated: average time=%dns", totalTime.get() / numTasks.get()));
		}finally{
			super.terminated();
		}
	}
}

遞歸算法的並行性:

書中的一個謎題實例: 函數

/**
 * 串行的謎題解答題
 */
public class SequentialPuzzleSolver<P, M> implements Puzzle<P, M>{
	private final Puzzle<P, M> puzzle;
	private final Set<P> seen = new HashSet<>();
	
	public SequentialPuzzleSolver(Puzzle<P, M> puzzle) {
		this.puzzle = puzzle;
	}

	public List<M> solve(){
		P pos = puzzle.initialPosition();
		return search(new Node<P, M>(pos, null, null));
	}
	
	private List<M> search(Node<P, M> node) {
		if (!seen.contains(node.pos)){
			seen.contains(node.pos);
			if (puzzle.isGoal(node.pos)){//找到了目標位置
					return node.asMoveList();
			}
			for (M move: puzzle.legalMoves(node.pos)){
				P pos = puzzle.move(node.pos, move);
				Node<P, M> child = new Node<P, M>(pos, move, node);
				List<M> result = search(child); //遞歸搜索
				if (result != null) 
					return result;
			}
		}
		return null;
	}
       ..,
}
上面解決器的併發實現:
/**
 * 併發的謎題解答器
 */
public class ConcurrentPuzzleSolver<P, M> {
	private final Puzzle<P, M> puzzle;
	private final ExecutorService exec;
	private final ConcurrentHashMap<P, Boolean> seen;
	final ValueLatch<Node<P, M>> solution
	                = new ValueLatch<Node<P, M>>(); //存放答案
       ...
	public List<M> solve() throws InterruptedException{
		P p = puzzle.initialPosition();
		exec.execute(newTask(p, null, null));
		//阻塞直到找到答案
		Node<P, M> solvNode = solution.getValue();
		return solvNode == null ? null : solvNode.asMoveList();
	}

	private Runnable newTask(P p, M m, Node<P, M> n) {
		return new SolverTask(p, m, n);
	}
	
	class SolverTask extends Node<P, M> implements Runnable {

		public SolverTask(P pos, M move, Node<P, M> prev) {
			super(pos, move, prev);
		}

		@Override
		public void run() {
			if (solution.isSet() //若已經找到了答案,阻止其餘線程繼續再找
					|| seen.putIfAbsent(pos, true) != null){
				return; 
			}
			if (puzzle.isGoal(pos)){  //找到了
				solution.setValue(this);
			} else{
				for (M m : puzzle.legalMoves(pos)){
					//繼續找
					exec.execute(newTask(puzzle.move(pos, m), m, this));
				}
			}
		}
	}
}

/**
 * 攜帶結果的閉鎖
 */
public class ValueLatch<T> {
	private T value = null;
	private final CountDownLatch done = new CountDownLatch(1);
	
	public T getValue() throws InterruptedException {
		done.await(); //阻塞直到設置了值
		synchronized (this) {
			return value;
		}
	}

	public boolean isSet() {
		return done.getCount() == 0;
	}

	public synchronized void setValue(T newValue) {
		if (!isSet()){
			value = newValue;
			done.countDown();
		}
	}
}

可是併發的解決器,對於未找到答案不是很好處理,能夠經過計數來實現: 性能

/**
 * 在解決器中找不到解答時
 */
public class PuzzleSolver<P, M> extends ConcurrentPuzzleSolver<P, M> {
	private final AtomicInteger taskCount = new AtomicInteger(); //經過計數來標誌是否找到解答
       ...
	@Override
	protected Runnable newTask(P p, M m, Node<P, M> n) {
		return new CountingSolverTask(p, m, n);
	}

	class CountingSolverTask extends SolverTask{
		
		public CountingSolverTask(P pos, M move, Node<P, M> prev) {
			super(pos, move, prev);
			taskCount.incrementAndGet();
		}

		@Override
		public void run() {
			try{
				super.run();
			} finally{
				if (taskCount.decrementAndGet() == 0){
					solution.setValue(null);
				}
			}
		}
	}
}

不吝指正。

相關文章
相關標籤/搜索