1. 依賴性任務。提交的任務須要依賴其餘任務,此時須要當心維護這些執行策略以免產生活躍性問題。 html
2. 使用線程封閉機制的任務。任務要求其執行所在的Executor是線程安全的。 java
3. 對響應時間敏感的任務。 node
4. 使用ThreadLocal的任務。ThreadLocal使每一個線程均可以擁有某個變量的一個私有"版本"。 算法
/** * 在單線程Executor中任務發生死鎖 */ public class ThreadDeadLock { ExecutorService exec = Executors.newSingleThreadExecutor(); public class RenderPageTask implements Callable<String>{ @Override public String call() throws Exception { Future<String> header, footer; //頁眉, 頁腳 header = exec.submit(new LoadFileTask("header.html")); footer = exec.submit(new LoadFileTask("footer.html")); String body = renderBody(); //有可能發生死鎖---任務等待子任務完成 return header.get() + body + footer.get(); } ... } }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
1. AbortPolicy(停止策略),默認的飽和策略。會拋出RejectedExecutionException異常。 安全
2. DiscardPolicy(拋棄策略): 會拋棄該任務。 網絡
3. DiscardOldestPolicy:會拋棄下一個將被執行的任務,而後嘗試從新提交新的任務。最好不和優先級隊列一塊兒使用,由於它會拋棄優先級最高的任務。 併發
4. CallerRunsPolicy(調用者運行策略):將任務回退給調用者。它不會在線程池的某個線程中執行新提交的任務,而是在一個調用execute的線程中執行該任務。 ide
/** * 建立一個固定大小的線程池, * 並採用有界隊列與"調用者運行"飽和策略 */ public void intThreadPool() { ThreadPoolExecutor executor = new ThreadPoolExecutor(N_THREADS, N_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(CAPACITY)); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); }
/** * 使用Semaphore來控制任務的提交速率 */ public class BoundedExecutor { private final Executor exec; private final Semaphore semaphore; public BoundedExecutor(Executor exec, int bound) { this.exec = exec; this.semaphore = new Semaphore(bound); } public void submitTask(final Runnable command){ try { semaphore.acquire(); //提交任務前請求信號量 exec.execute(new Runnable() { @Override public void run() { try{ command.run(); } finally{ semaphore.release(); //執行完釋放信號 } } }); } catch (InterruptedException e) { // handle exception } } }
/** * 自定義的線程工廠 */ public class MyThreadFactory implements ThreadFactory { private final String poolName; public MyThreadFactory(String poolName) { super(); this.poolName = poolName; } @Override public Thread newThread(Runnable r) { return new MyAppThread(r); } } public class MyAppThread extends Thread { public static final String DEFAULT_NAME="MyAppThread"; private static volatile boolean debugLifecycle = false; private static final AtomicInteger created = new AtomicInteger(); private static final AtomicInteger alive = new AtomicInteger(); private static final Logger log = Logger.getAnonymousLogger(); public MyAppThread(Runnable r) { this(r, DEFAULT_NAME); } public MyAppThread(Runnable r, String name) { super(r, name+ "-" + created.incrementAndGet()); setUncaughtExceptionHandler( //設置未捕獲的異常發生時的處理器 new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e); } }); } @Override public void run() { boolean debug = debugLifecycle; if (debug) log.log(Level.FINE, "running thread " + getName()); try { alive.incrementAndGet(); super.run(); } finally { alive.decrementAndGet(); if (debug) log.log(Level.FINE, "existing thread " + getName()); } } }
/** * 增長日誌和記時等功能的線程池 */ public class TimingThreadPoolExecutor extends ThreadPoolExecutor { private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();//任務執行開始時間 private final Logger log = Logger.getAnonymousLogger(); private final AtomicLong numTasks = new AtomicLong(); //統計任務數 private final AtomicLong totalTime = new AtomicLong(); //線程池運行總時間 public TimingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); log.fine(String.format("Thread %s: start %s", t, r)); startTime.set(System.nanoTime()); } @Override protected void afterExecute(Runnable r, Throwable t) { try{ long endTime = System.nanoTime(); long taskTime = endTime - startTime.get(); numTasks.incrementAndGet(); totalTime.addAndGet(taskTime); log.fine(String.format("Thread %s: end %s, time=%dns", t, r, taskTime)); } finally{ super.afterExecute(r, t); } } @Override protected void terminated() { try{ //任務執行平均時間 log.info(String.format("Terminated: average time=%dns", totalTime.get() / numTasks.get())); }finally{ super.terminated(); } } }
書中的一個謎題實例: 函數
/** * 串行的謎題解答題 */ public class SequentialPuzzleSolver<P, M> implements Puzzle<P, M>{ private final Puzzle<P, M> puzzle; private final Set<P> seen = new HashSet<>(); public SequentialPuzzleSolver(Puzzle<P, M> puzzle) { this.puzzle = puzzle; } public List<M> solve(){ P pos = puzzle.initialPosition(); return search(new Node<P, M>(pos, null, null)); } private List<M> search(Node<P, M> node) { if (!seen.contains(node.pos)){ seen.contains(node.pos); if (puzzle.isGoal(node.pos)){//找到了目標位置 return node.asMoveList(); } for (M move: puzzle.legalMoves(node.pos)){ P pos = puzzle.move(node.pos, move); Node<P, M> child = new Node<P, M>(pos, move, node); List<M> result = search(child); //遞歸搜索 if (result != null) return result; } } return null; } .., }上面解決器的併發實現:
/** * 併發的謎題解答器 */ public class ConcurrentPuzzleSolver<P, M> { private final Puzzle<P, M> puzzle; private final ExecutorService exec; private final ConcurrentHashMap<P, Boolean> seen; final ValueLatch<Node<P, M>> solution = new ValueLatch<Node<P, M>>(); //存放答案 ... public List<M> solve() throws InterruptedException{ P p = puzzle.initialPosition(); exec.execute(newTask(p, null, null)); //阻塞直到找到答案 Node<P, M> solvNode = solution.getValue(); return solvNode == null ? null : solvNode.asMoveList(); } private Runnable newTask(P p, M m, Node<P, M> n) { return new SolverTask(p, m, n); } class SolverTask extends Node<P, M> implements Runnable { public SolverTask(P pos, M move, Node<P, M> prev) { super(pos, move, prev); } @Override public void run() { if (solution.isSet() //若已經找到了答案,阻止其餘線程繼續再找 || seen.putIfAbsent(pos, true) != null){ return; } if (puzzle.isGoal(pos)){ //找到了 solution.setValue(this); } else{ for (M m : puzzle.legalMoves(pos)){ //繼續找 exec.execute(newTask(puzzle.move(pos, m), m, this)); } } } } } /** * 攜帶結果的閉鎖 */ public class ValueLatch<T> { private T value = null; private final CountDownLatch done = new CountDownLatch(1); public T getValue() throws InterruptedException { done.await(); //阻塞直到設置了值 synchronized (this) { return value; } } public boolean isSet() { return done.getCount() == 0; } public synchronized void setValue(T newValue) { if (!isSet()){ value = newValue; done.countDown(); } } }
可是併發的解決器,對於未找到答案不是很好處理,能夠經過計數來實現: 性能
/** * 在解決器中找不到解答時 */ public class PuzzleSolver<P, M> extends ConcurrentPuzzleSolver<P, M> { private final AtomicInteger taskCount = new AtomicInteger(); //經過計數來標誌是否找到解答 ... @Override protected Runnable newTask(P p, M m, Node<P, M> n) { return new CountingSolverTask(p, m, n); } class CountingSolverTask extends SolverTask{ public CountingSolverTask(P pos, M move, Node<P, M> prev) { super(pos, move, prev); taskCount.incrementAndGet(); } @Override public void run() { try{ super.run(); } finally{ if (taskCount.decrementAndGet() == 0){ solution.setValue(null); } } } } }
不吝指正。