【轉】Spring線程及線程池的使用

spring @Async 線程池使用

最近公司項目正逐漸從dubbo向springCloud轉型,在本次新開發的需求中,所有使用springcloud進行,在使用時線程池,考慮使用spring封裝的線程池,現將本次使用心得及內容記錄下來html

1、線程池常規使用方式java

以前使用線程池的方式,都是本身定義線程池,而後寫多線程類,用線程池去調用,以下:spring

package cn.leadeon.message.client; import cn.leadeon.comm.log.Log; import cn.leadeon.message.req.MessageProducerReq; import lombok.Data; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 流量消息發送類,線程池調用 * * @author LiJunJun * @since 2018/9/30 */ @Data public class MessageClientSendMsg { /** * 日誌記錄器 */
    private static final Log LOGGER = new Log(MessageClientSendMsg.class); /** * 線程池 */
    private static ExecutorService threadPool; /** * trace */
    private String trace; /** * 手機號 */
    private String cellNum; /** * 消息實體 */
    private MessageProducerReq messageProducerReq; static { threadPool = Executors.newFixedThreadPool(10); } /** * 構造函數 * * @param trace 請求流水 * @param cellNum 電話號碼 * @param messageProducerReq 消息實體 */
    public MessageClientSendMsg(String trace, String cellNum, MessageProducerReq messageProducerReq) { this.trace = trace; this.cellNum = cellNum; this.messageProducerReq = messageProducerReq; } /** * 消息發送 */
    public void sendMsg() { SendMsgRunable sendMsgRunable = new SendMsgRunable(); threadPool.execute(sendMsgRunable); } /** * 發送消息內部類並處理異常,不能影響主線程的業務 */
    class SendMsgRunable implements Runnable { @Override public void run() { try { MessageClientProducer msgClintProducer = new MessageClientProducer(); msgClintProducer.sendAsyncWithPartition(trace, cellNum, messageProducerReq); } catch (Exception e) { LOGGER.error("消息發送失敗!,trace:" + trace); } } } }

2、使用spring的線程池編程

  • 線程池的啓用

  有兩種方式,配置文件或者註解多線程

  註解:使用@EnableAsync標註啓用spring線程池,@Async將方法標註爲異步方法,spring掃描到後,執行該方法時,會另起新線程去執行,很是簡單mvc

package cn.leadeon.message.test; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.stereotype.Component; /** * @author LiJunJun * @since 2018/10/11 */ @Component @EnableAsync public class AsyncTest { @Async public void test1() { System.out.println("異步執行test1!!!"); System.out.println("線程id:" + Thread.currentThread().getId()); System.out.println("線程名稱:" + Thread.currentThread().getName()); } @Async public void test2() { System.out.println("異步執行test2!!!"); System.out.println("線程id:" + Thread.currentThread().getId()); System.out.println("線程名稱:" + Thread.currentThread().getName()); } @Async public void test3() { System.out.println("異步執行test3!!!"); System.out.println("線程id:" + Thread.currentThread().getId()); System.out.println("線程名稱:" + Thread.currentThread().getName()); } }

配置文件:新增spring的配置文件spring-threadpool.xml異步

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:context="http://www.springframework.org/schema/context" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd" default-autowire="byName">

    <description>流量消息spring線程池配置</description>

    <!-- 缺省的異步任務線程池 -->
    <task:annotation-driven executor="messageExecutor"/>
    <task:executor id="asyncExecutor" pool-size="100-10000" queue-capacity="10"/>

    <!-- 處理message的線程池 -->
    <task:executor id="messageExecutor" pool-size="15-50" queue-capacity="100" keep-alive="60" rejection-policy="CALLER_RUNS"/>

</beans>

 

使用註解引入配置文件或者在本身的spring配置文件中import便可async

複製代碼
package cn.leadeon.message.test; import org.springframework.context.annotation.ImportResource; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; /** * @author LiJunJun * @since 2018/10/11 */ @Component @ImportResource("classpath:/config/spring-threadpool.xml") public class AsyncTest { @Async public void test1() { System.out.println("異步執行test1!!!"); System.out.println("線程id:" + Thread.currentThread().getId()); System.out.println("線程名稱:" + Thread.currentThread().getName()); } @Async public void test2() { System.out.println("異步執行test2!!!"); System.out.println("線程id:" + Thread.currentThread().getId()); System.out.println("線程名稱:" + Thread.currentThread().getName()); } @Async public void test3() { System.out.println("異步執行test3!!!"); System.out.println("線程id:" + Thread.currentThread().getId()); System.out.println("線程名稱:" + Thread.currentThread().getName()); } }
複製代碼

  配置文件能夠本身配置線程池的相關參數,本身能夠配置多個線程池,使用時,用@Async(value="beanId")區分便可ide

  注意點:
  @EnableAsync註解與<task:annotation-driven executor="messageExecutor"/>等價,二者只能使用其一,否則啓動會報錯函數

  • java編程方式配置自定義線程池,以下:
複製代碼
package cn.leadeon.message.base.threadpool; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; /** * 流量消息線程池配置 * * @author LiJunJun * @since 2018/10/10 */ @Configuration public class ThreadPoolConfiguration { /** * 核心線程數:線程池建立時候初始化的線程數 */ @Value("${executor.core.pool.size}") private int corePoolSize; /** * 最大線程數:線程池最大的線程數,只有在緩衝隊列滿了以後纔會申請超過核心線程數的線程 */ @Value("${executor.max.pool.size}") private int maxPoolSize; /** * 緩衝隊列200:用來緩衝執行任務的隊列 */ @Value("${executor.queue.capacity}") private int queueCapacity; /** * 容許線程的空閒時間(單位:秒):當超過了核心線程出以外的線程在空閒時間到達以後會被銷燬 */ @Value("${executor.keepalive.Seconds}") private int keepAliveSeconds; /** * 線程池名的前綴:設置好了以後能夠方便咱們定位處理任務所在的線程池 */ @Value("${executor.thread.name.prefix}") private String threadNamePrefix; @Bean public Executor MessageExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setKeepAliveSeconds(keepAliveSeconds); executor.setThreadNamePrefix(threadNamePrefix); // 線程池對拒絕任務的處理策略:這裏採用了CallerRunsPolicy策略,當線程池沒有處理能力的時候,該策略會直接在 execute 方法的調用線程中運行被拒絕的任務;若是執行程序已關閉,則會丟棄該任務 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } }
複製代碼
  • 測試
複製代碼
package cn.leadeon.message.test; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; /** * spring線程池單元測試 * * @author LiJunJun * @since 2018/10/11 */ public class TestSpringThreadPool extends JunitTestBase { @Autowired private AsyncTest asyncTest; /** * spring線程池單元測試 */ @Test public void testThreadPool() { System.out.println("主線程id:" + Thread.currentThread().getId()); System.out.println("主線程名稱:" + Thread.currentThread().getName()); asyncTest.test1(); asyncTest.test2(); asyncTest.test3(); } }
複製代碼

測試結果:主線程和異步方法分別使用了不一樣的線程去調用,測試完成

 ******************************

xml配置線程池的另外一種方式

<!-- 線程池配置 -->
    <bean id="executor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <!-- 核心線程數,默認爲1 -->
        <property name="corePoolSize" value="10" />
        <!--最大線程數,默認爲Integer.MAX_VALUE -->
        <property name="maxPoolSize" value="50" />
        <!--隊列最大長度,通常須要設置值>=notifyScheduledMainExecutor.maxNum;默認爲Integer.MAX_VALUE -->
        <property name="queueCapacity" value="10000" />
        <!--線程池維護線程所容許的空閒時間,默認爲60s -->
        <property name="keepAliveSeconds" value="300" />
        <!--線程池對拒絕任務(無線程可用)的處理策略,目前只支持AbortPolicy、CallerRunsPolicy;默認爲後者 1. CallerRunsPolicy :這個策略重試添加當前的任務,他會自動重複調用 execute() 方法,直到成功。 2. AbortPolicy :超過隊列最大數量後對拒絕的任務拋棄處理,而且拋出異常。 -->
        <property name="rejectedExecutionHandler">
            <bean class="java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy" />
        </property>
    </bean>
相關文章
相關標籤/搜索