Java裏阻塞線程的三種實現方法

在平常開發中,咱們有時會遇到遇到多線程處理任務的狀況,JDK裏提供了便利的ThreadPoolExecutor以及其包裝的工具類Executors。可是咱們知道ExecutorService.excute(Runnable r)是異步的,超過線程池處理能力的線程會被加入到執行隊列裏。有時候爲了保證任務提交的順序性,咱們不但願有這個執行隊列,在線程池滿的時候,則把主線程阻塞。那麼,怎麼實現呢?html

最直接的想法是繼承ThreadPoolExecutor,重載excute()方法,加入線程池是否已滿的檢查,若線程池已滿,則等待直到上一個任務執行完畢。這裏ThreadPoolExecutor提供了一個afterExecute(Runnable r, Throwable t)方法,每一個任務執行結束時會調用這個方法。 同時,咱們會用到concurrent包的ReentrantLock以及Condition.wait/notify方法。如下是實現代碼(代碼來自:http://www.cnblogs.com/steeven/archive/2005/12/08/293219.html):java

<!-- lang: java -->
private ReentrantLock pauseLock = new ReentrantLock();
 private Condition unpaused = pauseLock.newCondition();
 @Override
 public void execute(Runnable command) {
  pauseLock.lock();
  try {
   while (getPoolSize()==getMaximumPoolSize() && getQueue().remainingCapacity()==0)
    unpaused.await();
   super.execute(command);//放到lock外面的話,在壓力測試下會有漏網的!
  } catch (InterruptedException e) {
   log.warn(this, e);
  } finally {
   pauseLock.unlock();
  }
 }
 @Override
 protected void afterExecute(Runnable r, Throwable t) {
  super.afterExecute(r,t);
  try{
   pauseLock.lock();
   unpaused.signal();
  }finally{
   pauseLock.unlock();
  }
 }

固然,有些熟悉JDK源碼的人會說,本身實現這個太費勁了,不喜歡!有沒有比較簡單的方法呢?多線程

這裏介紹一下vela同窗的方法: http://vela.diandian.com/post/2012-07-24/40031283329異步

研究ThreadPoolExecutor.excute()源碼會發現,它調用了BlockingQueue.offer()來實現多餘任務的入隊。BlockingQueue有兩個方法:BlockingQueue.offer()和BlockingQueue.put(),前者在隊列滿時不阻塞,直接失敗,後者在隊列滿時阻塞。那麼,問題就很簡單了,繼承某個BlockingQueue,而後將offer()重寫,改爲調用put()就搞定了!最短的代碼量,也能起到很好的效果哦!ide

<!-- lang: java -->
package com.diandian.framework.concurrent;
               
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
               
public class ExecutorsEx {
               
    /**
     * 建立一個堵塞隊列
     * 
     * @param threadSize
     * @return
     */
    public static ExecutorService newFixedThreadPool(int threadSize) {
        return new ThreadPoolExecutor(threadSize, threadSize, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(1) {
               
                    private static final long serialVersionUID = -9028058603126367678L;
               
                    @Override
                    public boolean offer(Runnable e) {
                        try {
                            put(e);
                            return true;
                        } catch (InterruptedException ie) {
                            Thread.currentThread().interrupt();
                        }
                        return false;
                    }
                });
    }
}

固然這個方法有一點讓人不快的地方,由於它與咱們熟知的OO基本原則之一--里氏替換原則衝突了,即子類的方法與父類的方法有不一樣的行爲。畢竟都是實現了BlockingQueue接口,offer()方法的行爲被改變了。雖然只是一個匿名類,可是對於某些OOP的擁躉來講總有些不爽的地方吧!函數

不要緊,咱們還有JDK默認的解決方法:使用RejectedExecutionHandler。當ThreadPoolExecutor.excute執行失敗時,會調用的RejectedExecutionHandler,這就是ThreadPoolExecutor的可定製的失敗策略機制。JDK默認提供了4種失敗策略: AbortPolicy(停止)、CallersRunPolicy(調用者運行)、DiscardPolicy(丟棄)、DiscardOldestPolicy(丟棄最舊的)。 其中值得說的是CallersRunPolicy,它會在excute失敗後,嘗試使用主線程(就是調用excute方法的線程)去執行它,這樣就起到了阻塞的效果!因而一個完徹底全基於JDK的方法誕生了:工具

<!-- lang: java -->
public static ExecutorService newBlockingExecutorsUseCallerRun(int size) {
	return new ThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),
			new ThreadPoolExecutor.CallerRunsPolicy());
}

固然這個方法有一個問題:這樣加上主線程,老是會比參數的size線程多上一個。要麼在函數開始就把size-1,要麼,咱們能夠嘗試本身實現一個RejectedExecutionHandler:post

<!-- lang: java -->
public static ExecutorService newBlockingExecutorsUseCallerRun(int size) {
	return new ThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),
			new RejectedExecutionHandler() {

				@Override
				public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
					try {
						executor.getQueue().put(r);
					} catch (InterruptedException e) {
						throw new RuntimeException(e);
					}
				}
			});
}

怎麼樣,這下是否是感受挺好了呢?測試


2013年9月22日更新:this

事實證實,除了JDK的CallerRunsPolicy方案,其餘的方案都存在一個隱患:

若是線程仍在執行,此時顯式調用ExecutorService.shutdown()方法,會由於還有一個線程阻塞沒有入隊,而此時線程已經中止了,而這個元素纔剛剛入隊,最終會致使RejectedExecutionException

相關文章
相關標籤/搜索