Spring中線程池的應用

多線程併發處理起來一般比較麻煩,若是你使用spring容器來管理業務bean,事情就好辦了多了。spring封裝了Java的多線程的實現,你只須要關注於併發事物的流程以及一些併發負載量等特性,具體來講如何使用spring來處理併發事務:html

 

1.瞭解 TaskExecutor接口java

Spring的TaskExecutor接口等同於java.util.concurrent.Executor接口。 實際上,它存在的主要緣由是爲了在使用線程池的時候,將對Java 5的依賴抽象出來。 這個接口只有一個方法execute(Runnable task),它根據線程池的語義和配置,來接受一個執行任務。最初建立TaskExecutor是爲了在須要時給其餘Spring組件提供一個線程池的抽象。 例如ApplicationEventMulticaster組件、JMS的 AbstractMessageListenerContainer和對Quartz的整合都使用了TaskExecutor抽象來提供線程池。 固然,若是你的bean須要線程池行爲,你也可使用這個抽象層。spring

 

2. TaskExecutor接口的實現類安全

(1)SimpleAsyncTaskExecutor 類多線程

這個實現不重用任何線程,或者說它每次調用都啓動一個新線程。可是,它仍是支持對併發總數設限,當超過線程併發總數限制時,阻塞新的調用,直到有位置被釋放。若是你須要真正的池,請繼續往下看。併發

 

(2)SyncTaskExecutor類app

這個實現不會異步執行。相反,每次調用都在發起調用的線程中執行。它的主要用處是在不須要多線程的時候,好比簡單的test case。異步

 

(3)ConcurrentTaskExecutor 類ide

這個實現是對Java 5 java.util.concurrent.Executor類的包裝。有另外一個備選, ThreadPoolTaskExecutor類,它暴露了Executor的配置參數做爲bean屬性。不多須要使用ConcurrentTaskExecutor, 可是若是ThreadPoolTaskExecutor不敷所需,ConcurrentTaskExecutor是另一個備選。單元測試

(4)SimpleThreadPoolTaskExecutor 類

這個實現其實是Quartz的SimpleThreadPool類的子類,它會監聽Spring的生命週期回調。當你有線程池,須要在Quartz和非Quartz組件中共用時,這是它的典型用處。

(5)ThreadPoolTaskExecutor 類

它不支持任何對java.util.concurrent包的替換或者下行移植。Doug Lea和Dawid Kurzyniec對java.util.concurrent的實現都採用了不一樣的包結構,致使它們沒法正確運行。 這個實現只能在Java 5環境中使用,可是倒是這個環境中最經常使用的。它暴露的bean properties能夠用來配置一個java.util.concurrent.ThreadPoolExecutor,把它包裝到一個TaskExecutor中。若是你須要更加先進的類,好比ScheduledThreadPoolExecutor,咱們建議你使用ConcurrentTaskExecutor來替代。

(6)TimerTaskExecutor類

這個實現使用一個TimerTask做爲其背後的實現。它和SyncTaskExecutor的不一樣在於,方法調用是在一個獨立的線程中進行的,雖然在那個線程中是同步的。

(7)WorkManagerTaskExecutor類

這個實現使用了CommonJ WorkManager做爲其底層實現,是在Spring context中配置CommonJ WorkManager應用的最重要的類。和SimpleThreadPoolTaskExecutor相似,這個類實現了WorkManager接口,所以能夠直接做爲WorkManager使用。  

 

3.線程池Demo之 ThreadPoolTaskExecutor

(1)編寫測試

  1. import org.springframework.core.task.TaskExecutor;  
  2.   
  3. public class MainExecutor {  
  4.     private TaskExecutor taskExecutor;  
  5.     public MainExecutor (TaskExecutor taskExecutor) {      
  6.         this.taskExecutor = taskExecutor;    
  7.     }  
  8.     public void printMessages() {      
  9.         for(int i = 0; i < 25; i++) {        
  10.             taskExecutor.execute(new MessagePrinterTask("Message" + i));      
  11.         }    
  12.     }  
  13.       
  14.       
  15.     private class MessagePrinterTask implements Runnable {      
  16.         private String message;      
  17.         public MessagePrinterTask(String message) {        
  18.             this.message = message;      
  19.         }      
  20.         public void run() {        
  21.             System.out.println(message);      
  22.         }  
  23.     }  
  24. }  

在業務代碼中,一般以for循環的方式執行多個事務
for(int k = 0; k < n; k++) {    
        taskExecutor.execute(new ThreadTransCode());    
    }
其它繁瑣的線程管理的事情就交給執行器去管理。
值得注意的事有兩點
1, taskExecutor.execute(new ThreadTransCode());  激活的線程都是守護線程,主線程結束,守護線程就會放棄執行,這個在業務中式符合邏輯的,在單元測試中爲了看到執行效果,須要自行阻塞主線程。
2, taskExecutor.execute(new ThreadTransCode());    的執行也不是徹底安全的,在執行的過程當中可能會由於須要的線程查過了線程隊列的容量而拋出運行時異常,若有必要須要捕獲。


(2)spring的配置

 
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "/spring-beans.dtd">  
  3. <beans>  
  4.   
  5.   <!-- 異步線程池 -->  
  6.   <bean id="threadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">  
  7.     <!-- 核心線程數  -->  
  8.     <property name="corePoolSize" value="10" />  
  9.     <!-- 最大線程數 -->  
  10.     <property name="maxPoolSize" value="50" />  
  11.     <!-- 隊列最大長度 >=mainExecutor.maxSize -->  
  12.     <property name="queueCapacity" value="1000" />  
  13.     <!-- 線程池維護線程所容許的空閒時間 -->  
  14.     <property name="keepAliveSeconds" value="300" />  
  15.     <!-- 線程池對拒絕任務(無線程可用)的處理策略 -->  
  16.     <property name="rejectedExecutionHandler">  
  17.       <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />  
  18.     </property>  
  19.   </bean>  
  20.   
  21.   <bean id="mainExecutor" class="supben.MainExecutor">  
  22.     <property name="threadPool" ref="threadPool" />   
  23.   </bean>  
  24.   
  25.   <bean id="springScheduleExecutorTask" class="org.springframework.scheduling.concurrent.ScheduledExecutorTask">  
  26.     <property name="runnable" ref="mainExecutor" />  
  27.     <!-- 容器加載10秒後開始執行 -->  
  28.     <property name="delay" value="10000" />  
  29.     <!-- 每次任務間隔 5秒-->  
  30.     <property name="period" value="5000" />  
  31.   </bean>  
  32.     
  33.     <bean id="springScheduledExecutorFactoryBean" class="org.springframework.scheduling.concurrent.ScheduledExecutorFactoryBean">  
  34.     <property name="scheduledExecutorTasks">  
  35.       <list>  
  36.         <ref bean="springScheduleExecutorTask" />  
  37.       </list>  
  38.     </property>  
  39.   </bean>  
  40.   
  41. </beans>  

(3)調用

 
  1. ApplicationContext appContext = new ClassPathXmlApplicationContext("applicationContext.xml");     
  2. MainExecutor te = (MainExecutor)appContext.getBean("taskExecutorExample");  
  3. te.printMessages();  
  4. System.out.println("11111111111111111111111");  

(4)效果

 

 

 

案例:

 1 @EnableScheduling
 2 @Configuration
 3 public class WebConfig extends SafWebMvcConfigurerAdapter {
 4 
 5   、、、、
 6 
 7 
 8     @Bean
 9     public TaskExecutor taskExecutor() {
10         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
11         executor.setCorePoolSize(5);
12         executor.setMaxPoolSize(10);
13         return executor;
14     }
15 
16 }
 1  @Autowired
 2     private TaskExecutor taskExecutor;
 3 
 4 
 5     void save2AllTenantSetting(final ForbidScope forbidScope) {
 6         taskExecutor.execute(new Runnable() {
 7             @Override
 8             public void run() {
 9                 final List<Tenant> tenants = tenantService.findAll();
10 
11                 for (Tenant tenant : tenants) {
12                     Runnable save2TenantSettingRunnable
13                             = new Save2TenantSettingRunnable(tenant, forbidScope);
14                     taskExecutor.execute(save2TenantSettingRunnable);
15                 }
16             }
17         });
18     }

 案例:

首先要將TaskExecutor加入spring容器進行管理

 1 @Configuration
 2 public class WebMvcConfigurerAdpter extends AbstractWebMvcConfigurerAdpter {
 3 
 4     @Override
 5     public void configureMessageConverters(List<HttpMessageConverter<?>> converters) {
 6         super.configureMessageConverters(converters);
 7         WafJsonMapper.getMapper().enable(DeserializationFeature.FAIL_ON_NUMBERS_FOR_ENUMS);
 8     }
 9 
10     @Bean
11     public TaskExecutor taskExecutor() {
12         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
13         executor.setCorePoolSize(5);
14         executor.setMaxPoolSize(10);
15         return executor;
16     }
17 
18     @Bean
19     public static PropertySourcesPlaceholderConfigurer propertyConfig() {
20         return new PropertySourcesPlaceholderConfigurer();
21     }
22 }

使用:

 

 1 @RestController
 2 @RequestMapping(value = "/v0.1/tasks")
 3 public class TaskController {
 4 
 5     @Autowired
 6     private TaskService taskService;
 7 
 8     @RequestMapping()
 9     public Object execute() {
10         taskService.execute();
11         Map res = new HashMap();
12         res.put("result", "success");
13         return res;
14     }
15 }
@Service
public class TaskService {

    @Autowired
    private TaskExecutor executor;

    public void execute() {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    try {
                        Thread.sleep(1000);
                        System.out.println("task running ...");
                    } catch (Exception e) {

                    }
                }
            }
        });
    }
}

每次調用/v0.1/tasks 接口時, 不用等到任務執行結束後纔會響應,而是響應後,任務還可能在執行 -- 異步而非同步

相關文章
相關標籤/搜索