spring-線程池(1)

多線程併發處理起來一般比較麻煩,若是你使用spring容器來管理業務bean,事情就好辦了多了。spring封裝了java的多線程的實現,你只須要關注於併發事物的流程以及一些併發負載量等特性,具體來講如何使用spring來處理併發事務:java

1.瞭解 TaskExecutor接口spring

Spring的 TaskExecutor接口等同於java.util.concurrent.Executor接口 實際上,它存在的主要緣由是爲了在使用線程池的時候,將對Java 5的依賴抽象出來。 這個接口只有一個方法execute(Runnable task),它根據線程池的語義和配置,來接受一個執行任務。最初建立TaskExecutor是爲了在須要時給其餘Spring組件提供一個線程池的抽 象。 例如ApplicationEventMulticaster組件、JMS的 AbstractMessageListenerContainer和對Quartz的整合都使用了TaskExecutor抽象來提供線程池。 固然,若是你的bean須要線程池行爲,你也可使用這個抽象層。多線程

2. TaskExecutor接口的實現類併發

(1)SimpleAsyncTaskExecutor 類app

這個實現不重用任何線程,或者說它每次調用都啓動一個新線程。可是,它仍是支持對併發總數設限,當超過線程併發總數限制時,阻塞新的調用,直到有位置被釋放。若是你須要真正的池,請繼續往下看。異步

(2)SyncTaskExecutor類工具

這個實現不會異步執行。相反,每次調用都在發起調用的線程中執行。它的主要用處是在不須要多線程的時候,好比簡單的test case。測試

(3)ConcurrentTaskExecutor 類this

這個實現是對 Java 5 java.util.concurrent.Executor類的包裝。有另外一個備選, ThreadPoolTaskExecutor類,它暴露了Executor的配置參數做爲bean屬性。不多須要使用 ConcurrentTaskExecutor, 可是若是ThreadPoolTaskExecutor不敷所需,ConcurrentTaskExecutor是另一個備選。spa

(4)SimpleThreadPoolTaskExecutor 類

這個實現其實是Quartz的SimpleThreadPool類的子類,它會監聽Spring的生命週期回調。當你有線程池,須要在Quartz和非Quartz組件中共用時,這是它的典型用處。

(5)ThreadPoolTaskExecutor 類

它不支持任何對 java.util.concurrent包的替換或者下行移植。Doug Lea和Dawid Kurzyniec對java.util.concurrent的實現都採用了不一樣的包結構,致使它們沒法正確運行。 這個實現只能在Java 5環境中使用,可是倒是這個環境中最經常使用的。它暴露的bean properties能夠用來配置一個java.util.concurrent.ThreadPoolExecutor,把它包裝到一個 TaskExecutor中。若是你須要更加先進的類,好比ScheduledThreadPoolExecutor,咱們建議你使用 ConcurrentTaskExecutor來替代。

(6)TimerTaskExecutor類

這個實現使用一個TimerTask做爲其背後的實現。它和SyncTaskExecutor的不一樣在於,方法調用是在一個獨立的線程中進行的,雖然在那個線程中是同步的。

(7)WorkManagerTaskExecutor類

這個實現使用了 CommonJ WorkManager做爲其底層實現,是在Spring context中配置CommonJ WorkManager應用的最重要的類。和SimpleThreadPoolTaskExecutor相似,這個類實現了WorkManager接口, 所以能夠直接做爲WorkManager使用。 

 

3. 簡單hellword

 

3.1 線程類

public class MessagePrinterTask implements Runnable {
    private String message;

    public MessagePrinterTask() {

    }
    public MessagePrinterTask(String message) {
        this.message = message;
    }

    public void run() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println(message);
    }
}

 

3.1 線程管理工具類

import org.springframework.core.task.TaskExecutor;

public class TaskExecutorUtil {
    
    private TaskExecutor taskExecutor;
    public TaskExecutor getTaskExecutor() {
        return taskExecutor;
    }

    public void setTaskExecutor(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    public void printMessages(Runnable r,int i) {
            taskExecutor.execute(r);
            System.out.println("add Thread:"+i);
    }


}

 

applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>  
<beans  
xmlns="http://www.springframework.org/schema/beans"  
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
xmlns:context="http://www.springframework.org/schema/context"  
xsi:schemaLocation="  
http://www.springframework.org/schema/beans        
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
http://www.springframework.org/schema/context                
http://www.springframework.org/schema/context/spring-context-3.0.xsd">  
  <!-- 異步線程池 --> 
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> 
    <!-- 核心線程數  -->  
    <property name="corePoolSize" value="2" />  
    <!-- 最大線程數 -->  
    <property name="maxPoolSize" value="3" />  
    <!-- 隊列最大長度 >=mainExecutor.maxSize -->  
    <property name="queueCapacity" value="10" />  
    <!-- 線程池維護線程所容許的空閒時間 -->  
    <property name="keepAliveSeconds" value="300" />  
    <!-- 線程池對拒絕任務(無線程可用)的處理
    策略 --> <!-- 若不做該處理,當線程滿了,隊列滿了以後,繼續往下增長任務,則拋出異常,拒絕該任務 --> 
    <property name="rejectedExecutionHandler">  
      <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />  
    </property>
</bean>
  
<bean id="taskExecutorUtil" class="TaskExecutorUtil">    
    <!-- <constructor-arg ref="taskExecutor" />  --> 
    <property name="taskExecutor" ref="taskExecutor" /> 
</bean> 

<!-- 託管線程 -->
<bean id="messagePrinterTask" class="MessagePrinterTask">    
</bean> 

  
  
</beans>

 

測試

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
 * Hello world!
 *
 */
public class App 
{
    public static void main( String[] args )
    {
        ApplicationContext appContext = new ClassPathXmlApplicationContext("applicationContext.xml");    
        TaskExecutorUtil te = (TaskExecutorUtil)appContext.getBean("taskExecutorUtil");   
        
        for (int i = 0; i < 25; i++) {
            MessagePrinterTask m=new MessagePrinterTask("Message" + i);
            te.printMessages(m,i); 
        } 
        System.out.println("11111111111111111111111"); 
    }
}

結果輸出

add Thread:0
add Thread:1
add Thread:2
add Thread:3
add Thread:4
add Thread:5
add Thread:6
add Thread:7
add Thread:8
add Thread:9
add Thread:10
add Thread:11
add Thread:12
Message1
Message0
Message13
add Thread:13
Message12
add Thread:14
add Thread:15
add Thread:16
Message2
Message4
Message17
add Thread:17
Message3
add Thread:18
add Thread:19
add Thread:20
Message5
Message7
Message21
add Thread:21
Message6
add Thread:22
add Thread:23
add Thread:24
11111111111111111111111
Message9
Message8
Message10
Message14
Message15
Message11
Message18
Message19
Message16
Message22
Message23
Message20
Message24

 

解釋:

1.由於線程類睡眠5秒,因此在5秒前add Thread增長了13個線程(3個最大線程+10個在隊列中等候),因爲在作了對了滿了的策略(rejectedExecutionHandler),因此不會拋出異常,

其餘線程等候加入隊列

2.五秒後打印,而後繼續addThread,打印message,注意當前最大線程執行是3個,,其他在隊列等候,而後打印剩餘線程,111111111111是主線程

 

 

四、配置解釋
當一個任務經過execute(Runnable)方法欲添加到線程池時:
一、 若是此時線程池中的數量小於corePoolSize,即便線程池中的線程都處於空閒狀態,也要建立新的線程來處理被添加的任務。
二、 若是此時線程池中的數量等於 corePoolSize,可是緩衝隊列 workQueue未滿,那麼任務被放入緩衝隊列。
三、若是此時線程池中的數量大於corePoolSize,緩衝隊列workQueue滿,而且線程池中的數量小於maximumPoolSize,建新的線程來處理被添加的任務。
四、 若是此時線程池中的數量大於corePoolSize,緩衝隊列workQueue滿,而且線程池中的數量等於maximumPoolSize,那麼經過 handler所指定的策略來處理此任務。也就是:處理任務的優先級爲:核心線程corePoolSize、任務隊列workQueue、最大線程 maximumPoolSize,若是三者都滿了,使用handler處理被拒絕的任務。
五、 當線程池中的線程數量大於 corePoolSize時,若是某線程空閒時間超過keepAliveTime,線程將被終止。這樣,線程池能夠動態的調整池中的線程數。

 

 

 

 

5. 線程類爲 java.util.concurrent.ThreadPoolExecutor:

線程池類爲 java.util.concurrent.ThreadPoolExecutor,經常使用構造方法爲:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)

corePoolSize: 線程池維護線程的最少數量
maximumPoolSize:線程池維護線程的最大數量
keepAliveTime: 線程池維護線程所容許的空閒時間
unit: 線程池維護線程所容許的空閒時間的單位
workQueue: 線程池所使用的緩衝隊列
handler: 線程池對拒絕任務的處理策略

一個任務經過 execute(Runnable)方法被添加到線程池,任務就是一個 Runnable類型的對象,任務的執行方法就是 Runnable類型對象的run()方法。

當一個任務經過execute(Runnable)方法欲添加到線程池時:

若是此時線程池中的數量小於corePoolSize,即便線程池中的線程都處於空閒狀態,也要建立新的線程來處理被添加的任務。

若是此時線程池中的數量等於 corePoolSize,可是緩衝隊列 workQueue未滿,那麼任務被放入緩衝隊列。

若是此時線程池中的數量大於corePoolSize,緩衝隊列workQueue滿,而且線程池中的數量小於maximumPoolSize,建新的線程來處理被添加的任務。

若是此時線程池中的數量大於corePoolSize,緩衝隊列workQueue滿,而且線程池中的數量等於maximumPoolSize,那麼經過 handler所指定的策略來處理此任務。

也就是:處理任務的優先級爲:
核心線程corePoolSize、任務隊列workQueue、最大線程maximumPoolSize,若是三者都滿了,使用handler處理被拒絕的任務。

當線程池中的線程數量大於 corePoolSize時,若是某線程空閒時間超過keepAliveTime,線程將被終止。這樣,線程池能夠動態的調整池中的線程數。

unit可選的參數爲java.util.concurrent.TimeUnit中的幾個靜態屬性:
NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。

workQueue我經常使用的是:java.util.concurrent.ArrayBlockingQueue

handler有四個選擇:
ThreadPoolExecutor.AbortPolicy()
拋出java.util.concurrent.RejectedExecutionException異常
ThreadPoolExecutor.CallerRunsPolicy()
重試添加當前的任務,他會自動重複調用execute()方法
ThreadPoolExecutor.DiscardOldestPolicy()
拋棄舊的任務
ThreadPoolExecutor.DiscardPolicy()
拋棄當前的任務
相關文章
相關標籤/搜索