spring @Async 線程池使用

最近公司項目正逐漸從dubbo向springCloud轉型,在本次新開發的需求中,所有使用springcloud進行,在使用時線程池,考慮使用spring封裝的線程池,現將本次使用心得及內容記錄下來java

 

1、線程池常規使用方式spring

以前使用線程池的方式,都是本身定義線程池,而後寫多線程類,用線程池去調用,以下:編程

package cn.leadeon.message.client;

import cn.leadeon.comm.log.Log;
import cn.leadeon.message.req.MessageProducerReq;
import lombok.Data;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 流量消息發送類,線程池調用
 *
 * @author LiJunJun
 * @since 2018/9/30
 */
@Data
public class MessageClientSendMsg {

    /**
     * 日誌記錄器
     */
    private static final Log LOGGER = new Log(MessageClientSendMsg.class);

    /**
     * 線程池
     */
    private static ExecutorService threadPool;

    /**
     * trace
     */
    private String trace;

    /**
     * 手機號
     */
    private String cellNum;

    /**
     * 消息實體
     */
    private MessageProducerReq messageProducerReq;

    static {
        threadPool = Executors.newFixedThreadPool(10);
    }

    /**
     * 構造函數
     *
     * @param trace 請求流水
     * @param cellNum 電話號碼
     * @param messageProducerReq 消息實體
     */
    public MessageClientSendMsg(String trace, String cellNum, MessageProducerReq messageProducerReq) {

        this.trace = trace;
        this.cellNum = cellNum;
        this.messageProducerReq = messageProducerReq;
    }

    /**
     * 消息發送
     */
    public void sendMsg() {

        SendMsgRunable sendMsgRunable = new SendMsgRunable();

        threadPool.execute(sendMsgRunable);
    }

    /**
     * 發送消息內部類並處理異常,不能影響主線程的業務
     */
    class SendMsgRunable implements Runnable {

        @Override
        public void run() {

            try {
                MessageClientProducer msgClintProducer = new MessageClientProducer();
                msgClintProducer.sendAsyncWithPartition(trace, cellNum, messageProducerReq);
            } catch (Exception e) {
                LOGGER.error("消息發送失敗!,trace:" + trace);
            }
        }
    }
}

2、使用spring的線程池多線程

  • 線程池的啓用

  有兩種方式,配置文件或者註解mvc

  註解:使用@EnableAsync標註啓用spring線程池,@Async將方法標註爲異步方法,spring掃描到後,執行該方法時,會另起新線程去執行,很是簡單異步

package cn.leadeon.message.test;

import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Component;

/**
 * @author LiJunJun
 * @since 2018/10/11
 */
@Component
@EnableAsync public class AsyncTest {

 @Async public void test1() {

        System.out.println("異步執行test1!!!");
        System.out.println("線程id:" + Thread.currentThread().getId());
        System.out.println("線程名稱:" + Thread.currentThread().getName());

    }

    @Async
    public void test2() {

        System.out.println("異步執行test2!!!");
        System.out.println("線程id:" + Thread.currentThread().getId());
        System.out.println("線程名稱:" + Thread.currentThread().getName());
    }

    @Async
    public void test3() {

        System.out.println("異步執行test3!!!");
        System.out.println("線程id:" + Thread.currentThread().getId());
        System.out.println("線程名稱:" + Thread.currentThread().getName());
    }
}

配置文件:新增spring的配置文件spring-threadpool.xmlasync

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jdbc="http://www.springframework.org/schema/jdbc"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:task="http://www.springframework.org/schema/task"
       xsi:schemaLocation="http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd
     http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
     http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.1.xsd
     http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
     http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"
       default-autowire="byName">

    <description>流量消息spring線程池配置</description>

    <!-- 缺省的異步任務線程池 -->
    <task:annotation-driven executor="messageExecutor"/>
    <task:executor id="asyncExecutor" pool-size="100-10000" queue-capacity="10"/>

    <!-- 處理message的線程池 -->
    <task:executor id="messageExecutor" pool-size="15-50" queue-capacity="100" keep-alive="60"
                   rejection-policy="CALLER_RUNS"/>

</beans>


使用註解引入配置文件或者在本身的spring配置文件中import便可ide

package cn.leadeon.message.test;

import org.springframework.context.annotation.ImportResource;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

/**
 * @author LiJunJun
 * @since 2018/10/11
 */
@Component
@ImportResource("classpath:/config/spring-threadpool.xml") public class AsyncTest {

    @Async
    public void test1() {

        System.out.println("異步執行test1!!!");
        System.out.println("線程id:" + Thread.currentThread().getId());
        System.out.println("線程名稱:" + Thread.currentThread().getName());

    }

    @Async
    public void test2() {

        System.out.println("異步執行test2!!!");
        System.out.println("線程id:" + Thread.currentThread().getId());
        System.out.println("線程名稱:" + Thread.currentThread().getName());
    }

    @Async
    public void test3() {

        System.out.println("異步執行test3!!!");
        System.out.println("線程id:" + Thread.currentThread().getId());
        System.out.println("線程名稱:" + Thread.currentThread().getName());
    }
}

  配置文件能夠本身配置線程池的相關參數,本身能夠配置多個線程池,使用時,用@Async(value="beanId")區分便可函數

  注意點:
  @EnableAsync註解與<task:annotation-driven executor="messageExecutor"/>等價,二者只能使用其一,否則啓動會報錯單元測試

  • java編程方式配置自定義線程池,以下:

 

package cn.leadeon.message.base.threadpool;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 流量消息線程池配置
 *
 * @author LiJunJun
 * @since 2018/10/10
 */
@Configuration
public class ThreadPoolConfiguration {

    /**
     * 核心線程數:線程池建立時候初始化的線程數
     */
    @Value("${executor.core.pool.size}")
    private int corePoolSize;

    /**
     * 最大線程數:線程池最大的線程數,只有在緩衝隊列滿了以後纔會申請超過核心線程數的線程
     */
    @Value("${executor.max.pool.size}")
    private int maxPoolSize;

    /**
     * 緩衝隊列200:用來緩衝執行任務的隊列
     */
    @Value("${executor.queue.capacity}")
    private int queueCapacity;

    /**
     * 容許線程的空閒時間(單位:秒):當超過了核心線程出以外的線程在空閒時間到達以後會被銷燬
     */
    @Value("${executor.keepalive.Seconds}")
    private int keepAliveSeconds;

    /**
     * 線程池名的前綴:設置好了以後能夠方便咱們定位處理任務所在的線程池
     */
    @Value("${executor.thread.name.prefix}")
    private String threadNamePrefix;

    @Bean
    public Executor MessageExecutor() {

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setKeepAliveSeconds(keepAliveSeconds);
        executor.setThreadNamePrefix(threadNamePrefix);

        // 線程池對拒絕任務的處理策略:這裏採用了CallerRunsPolicy策略,當線程池沒有處理能力的時候,該策略會直接在 execute 方法的調用線程中運行被拒絕的任務;若是執行程序已關閉,則會丟棄該任務
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}
  • 測試

 

package cn.leadeon.message.test;

import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;

/**
 * spring線程池單元測試
 *
 * @author LiJunJun
 * @since 2018/10/11
 */
public class TestSpringThreadPool extends JunitTestBase {

    @Autowired
    private AsyncTest asyncTest;

    /**
     * spring線程池單元測試
     */
    @Test
    public void testThreadPool() {

        System.out.println("主線程id:" + Thread.currentThread().getId());
        System.out.println("主線程名稱:" + Thread.currentThread().getName());
        asyncTest.test1();
        asyncTest.test2();
        asyncTest.test3();

    }
}

 

測試結果:主線程和異步方法分別使用了不一樣的線程去調用,測試完成

相關文章
相關標籤/搜索