Java線程池使用和經常使用參數

多線程問題:java

 

一、java中爲何要使用多線程
使用多線程,能夠把一些大任務分解成多個小任務來執行,多個小任務之間互不影像,同時進行,這樣,充分利用了cpu資源。spring


二、java中簡單的實現多線程的方式多線程

繼承Thread類,重寫run方法;併發

class MyTread extends Thread{

public void run() {
  System.out.println(Thread.currentThread().getName());
}

}
實現Runable接口,實現run方法;
class MyRunnable implements Runnable{

  public void run() {

    System.out.println(Thread.currentThread().getName());
  }
}
class ThreadTest {

  public static void main(String[] args) {

    MyTread thread = new Mythread();
    thread.start();	//開啓一個線程

    MyRunnable myRunnable = new MyRunnable();
    Thread runnable = new Thread(myRunnable);
    runnable.start();	//開啓一個線程

  }
}

  

三、java線程的狀態
建立:當new了一個線程,並無調用start以前,線程處於建立狀態;
就緒:當調用了start以後,線程處於就緒狀態,這是,線程調度程序尚未設置執行當前線程;
運行:線程調度程序執行到線程時,當前線程從就緒狀態轉成運行狀態,開始執行run方法裏邊的代碼;
阻塞:線程在運行的時候,被暫停執行(一般等待某項資源就緒後在執行,sleep、wait能夠致使線程阻塞),這是該線程處於阻塞狀態;
死亡:當一個線程執行完run方法裏邊的代碼或調用了stop方法後,該線程結束運行ide


四、爲何要引入線程池
當咱們須要的併發執行線程數量不少時,且每一個線程執行很短的時間就結束了,這樣,咱們頻繁的建立、銷燬線程就大大下降了工做效率(建立和銷燬線程須要時間、資源)。
java中的線程池能夠達到這樣的效果:一個線程執行完任務以後,繼續去執行下一個任務,不被銷燬,這樣線程利用率提升了。性能


五、java中的線程池(ThreadPoolExecutor)
提及java中的線程池,就想到java.util.concurrent.ThreadPoolExecutor。ThreadPoolExecutor類是java線程池中的核心類。他的實現方式有四種:學習

public class ThreadPoolExecutor extends AbstractExecutorService {
  public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
    Executors.defaultThreadFactory(), defaultHandler);
  }

   public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
    threadFactory, defaultHandler);
  }

  public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
    Executors.defaultThreadFactory(), handler);
  }

  public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
      maximumPoolSize <= 0 ||
      maximumPoolSize < corePoolSize ||
      keepAliveTime < 0)
      throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
      throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

  


經過ThreadPoolExecutor類的源碼能夠看出,ThreadPoolExecutor類繼承AbstractExecutorService,提供四個構造方法,經過構造方法能夠看出前面三個最終掉了最後一個
下面介紹下構造方法中的參數:優化


corePoolSize:線程池的大小。線程池建立以後不會當即去建立線程,而是等待線程的到來。噹噹前執行的線程數大於改值是,線程會加入到緩衝隊列;
maximumPoolSize:線程池中建立的最大線程數;
keepAliveTime:空閒的線程多久時間後被銷燬。默認狀況下,改值在線程數大於corePoolSize時,對超出corePoolSize值得這些線程起做用。
unit:TimeUnit枚舉類型的值,表明keepAliveTime時間單位,能夠取下列值:
TimeUnit.DAYS; //天
  TimeUnit.HOURS; //小時
  TimeUnit.MINUTES; //分鐘
  TimeUnit.SECONDS; //秒
  TimeUnit.MILLISECONDS; //毫秒
  TimeUnit.MICROSECONDS; //微妙
  TimeUnit.NANOSECONDS; //納秒
workQueue:阻塞隊列,用來存儲等待執行的任務,決定了線程池的排隊策略,有如下取值:
  ArrayBlockingQueue;
  LinkedBlockingQueue;
  SynchronousQueue;
  threadFactory:線程工廠,是用來建立線程的。默認new Executors.DefaultThreadFactory();
handler:線程拒絕策略。當建立的線程超出maximumPoolSize,且緩衝隊列已滿時,新任務會拒絕,有如下取值:
  ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。 
  ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,可是不拋出異常。 
  ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程)
  ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務this


如下是具體的實現方式:spa

//默認策略。使用該策略時,若是線程池隊列滿了丟掉這個任務而且拋出RejectedExecutionException異常
class AbortPolicy implements RejectedExecutionHandler{

  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    throw new RejectedExecutionException("Task " + r.toString() + 
    " rejected from " + 
    executor.toString()); 
  }
}
//若是線程池隊列滿了,會直接丟掉這個任務而且不會有任何異常
class DiscardPolicy implements RejectedExecutionHandler{

  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

  }
}
//丟棄最老的,會將最先進入隊列的任務刪掉騰出空間,再嘗試加入隊列
class DiscardOldestPolicy implements RejectedExecutionHandler{

  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    if (!executor.isShutdown()) { 
      //移除隊頭元素 
      executor.getQueue().poll(); 
    //再嘗試入隊 
      executor.execute(r); 
    } 
  }
}
//主線程會本身去執行該任務,不會等待線程池中的線程去執行
class CallerRunsPolicy implements RejectedExecutionHandler{

  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    if (!executor.isShutdown()) { 
      //直接執行run方法 
      r.run(); 
    } 
  }
}

  

如下是ThreadPoolExecutor具體的繼承結構

public abstract class AbstractExecutorService implements ExecutorService {

}


這是一個抽象類,實現了ExecutorService接口,並實現了ExecutorService裏邊的方法,下面看下ExecutorService接口的具體實現

public interface ExecutorService extends Executor {
  void shutdown();
  List<Runnable> shutdownNow();
  boolean isShutdown();
  boolean isTerminated();
  boolean awaitTermination(long timeout, TimeUnit unit)
  throws InterruptedException;
  <T> Future<T> submit(Callable<T> task);
  <T> Future<T> submit(Runnable task, T result);
  Future<?> submit(Runnable task);
  <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
  <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
  long timeout, TimeUnit unit)
  throws InterruptedException;
  <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  throws InterruptedException, ExecutionException;
  <T> T invokeAny(Collection<? extends Callable<T>> tasks,
  long timeout, TimeUnit unit)
  throws InterruptedException, ExecutionException, TimeoutException;
}

  

ExecutorService繼承Executor接口,下面是Executor接口的具體實現

public interface Executor {
  void execute(Runnable command);
}


Executor接口是頂層接口,只聲明瞭一個execute方法,該方法是用來執行傳遞進來的任務的。
回過頭來,咱麼從新看ThreadPoolExecutor類,改類裏邊有如下兩個重要的方法:

public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();
  int c = ctl.get();
  if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
      return;
    c = ctl.get();
  }
  if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
      reject(command);
    else if (workerCountOf(recheck) == 0)
    addWorker(null, false);
  }else if (!addWorker(command, false))
    reject(command);
}
public <T> Future<T> submit(Callable<T> task) {
  if (task == null) throw new NullPointerException();
  RunnableFuture<T> ftask = newTaskFor(task);
  execute(ftask);
  return ftask;
}

  

execute()方法是Executor中聲明的方法,在ThreadPoolExecutor有了具體的實現,這個方法是ThreadPoolExecutor的核心方法,
經過這個方法能夠向線程池提交一個任務,交由線程池去執行
submit()方法是ExecutorService中聲明的方法,在AbstractExecutorService中進行了實現,Executor中並無對其進行重寫。從實現中能夠看出,submit方法最終也調用了execute
方法,也是執行一我的去,但submit方法能夠返回執行結果,利用Future來獲取任務執行結果。


六、Spring中的線程池
Spring中的線程池是由ThreadPoolTaskExecutor類來實現的。該類的實現原理最終也是調用了java中的ThreadPoolExecutor類中的一些方法。具體的實現讀者能夠本身去翻閱Spring
的源碼,這裏筆者就不羅列了。咱們看下ThreadPoolTaskExecutor的初始化。
ThreadPoolTaskExecutor有兩種經常使用的有兩種初始化方式:xml配置,java代碼初始化
xml配置: 

<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="5" />
<property name="keepAliveSeconds" value="200" />
<property name="maxPoolSize" value="10" />
<property name="queueCapacity" value="20" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
</property>
</bean>


看過上面的內容,讀者應該很清楚上面的一些參數表明的意思了吧。筆者在這裏不一一去解釋了。

public MyThreadPoolTaskExecutor {
@Autowired
private ThreadPoolTaskExecutor taskExecutor;

 private void test(){
  taskExecutor.execute(new Runnable(){
    @Override
    public void run() {
     //執行的代碼
    }});
  }
}

  


Java代碼初始化:

private void test2(){
  ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  executor.setCorePoolSize(10);
  executor.setMaxPoolSize(15);
  executor.setKeepAliveSeconds(1);
  executor.setQueueCapacity(5);
  executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  executor.initialize();
  executor.execute(new Runnable(){
    @Override
    public void run() {
      //執行的代碼
    }
  });
}

  

 

經常使用參數總結:

關於Java線程池的參數設置: 線程池是Java多線程裏開發裏的重要內容,使用難度不大,但如何用好就要明白參數的含義和如何去設置。乾貨裏的內容大可能是參考別人的,加入了一些知識點的擴充和見解。但願能對多線程開發學習的童鞋有些啓發和幫助。

1、ThreadPoolExecutor的重要參數   

一、corePoolSize:核心線程數
        * 核心線程會一直存活,及時沒有任務須要執行
        * 當線程數小於核心線程數時,即便有線程空閒,線程池也會優先建立新線程處理
        * 設置allowCoreThreadTimeout=true(默認false)時,核心線程會超時關閉

    二、queueCapacity:任務隊列容量(阻塞隊列)
        * 當核心線程數達到最大時,新任務會放在隊列中排隊等待執行

    三、maxPoolSize:最大線程數
        * 當線程數>=corePoolSize,且任務隊列已滿時。線程池會建立新線程來處理任務
        * 當線程數=maxPoolSize,且任務隊列已滿時,線程池會拒絕處理任務而拋出異常

    四、 keepAliveTime:線程空閒時間
        * 當線程空閒時間達到keepAliveTime時,線程會退出,直到線程數量=corePoolSize
        * 若是allowCoreThreadTimeout=true,則會直到線程數量=0

    五、allowCoreThreadTimeout:容許核心線程超時
    六、rejectedExecutionHandler:任務拒絕處理器
        * 兩種狀況會拒絕處理任務:
            - 當線程數已經達到maxPoolSize,切隊列已滿,會拒絕新任務
            - 當線程池被調用shutdown()後,會等待線程池裏的任務執行完畢,再shutdown。若是在調用shutdown()和線程池真正shutdown之間提交任務,會拒絕新任務
        * 線程池會調用rejectedExecutionHandler來處理這個任務。若是沒有設置默認是AbortPolicy,會拋出異常
        * ThreadPoolExecutor類有幾個內部實現類來處理這類狀況:
            - AbortPolicy 丟棄任務,拋運行時異常
            - CallerRunsPolicy 執行任務
            - DiscardPolicy 忽視,什麼都不會發生
            - DiscardOldestPolicy 從隊列中踢出最早進入隊列(最後一個執行)的任務
        * 實現RejectedExecutionHandler接口,可自定義處理器

2、ThreadPoolExecutor執行順序       

線程池按如下行爲執行任務
    1. 當線程數小於核心線程數時,建立線程。
    2. 當線程數大於等於核心線程數,且任務隊列未滿時,將任務放入任務隊列。
    3. 當線程數大於等於核心線程數,且任務隊列已滿
        - 若線程數小於最大線程數,建立線程
        - 若線程數等於最大線程數,拋出異常,拒絕任務

3、如何設置參數

    一、默認值
        * corePoolSize=1
        * queueCapacity=Integer.MAX_VALUE
        * maxPoolSize=Integer.MAX_VALUE
        * keepAliveTime=60s
        * allowCoreThreadTimeout=false
        * rejectedExecutionHandler=AbortPolicy()

    二、如何來設置
        * 須要根據幾個值來決定
            - tasks :每秒的任務數,假設爲500~1000
            - taskcost:每一個任務花費時間,假設爲0.1s
            - responsetime:系統容許容忍的最大響應時間,假設爲1s
        * 作幾個計算
            - corePoolSize = 每秒須要多少個線程處理? 
                * threadcount = tasks/(1/taskcost) =tasks*taskcout =  (500~1000)*0.1 = 50~100 個線程。corePoolSize設置應該大於50
                * 根據8020原則,若是80%的每秒任務數小於800,那麼corePoolSize設置爲80便可
            - queueCapacity = (coreSizePool/taskcost)*responsetime
                * 計算可得 queueCapacity = 80/0.1*1 = 800。意思是隊列裏的線程能夠等待1s,超過了的須要新開線程來執行
                * 切記不能設置爲Integer.MAX_VALUE,這樣隊列會很大,線程數只會保持在corePoolSize大小,當任務陡增時,不能新開線程來執行,響應時間會隨之陡增。
            - maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)
                * 計算可得 maxPoolSize = (1000-800)/10 = 20
                * (最大任務數-隊列容量)/每一個線程每秒處理能力 = 最大線程數
            - rejectedExecutionHandler:根據具體狀況來決定,任務不重要可丟棄,任務重要則要利用一些緩衝機制來處理
            - keepAliveTime和allowCoreThreadTimeout採用默認一般能知足

    三、 以上都是理想值,實際狀況下要根據機器性能來決定。若是在未達到最大線程數的狀況機器cpu load已經滿了,則須要經過升級硬件(呵呵)和優化代碼,下降taskcost來處理。

設置。

相關文章
相關標籤/搜索