springboot給咱們提供了一個線程池的實現,它的底層是由線程池ThreadPoolTaskExecutor來實現的。相較與JDK提供的線程池進行了一些功能的加強,好比對線程狀態的監聽,在咱們在使用的時候更加的方便。在這裏給各位同窗一個配置模板,簡單的講解下Spring線程池的底層原理(在最後的源碼章節)。java
@Configuration:這是 Spring 3.0 添加的一個註解,用來代替 applicationContext.xml 配置文件,全部這個配置文件裏面能作到的事情均可以經過這個註解所在類來進行註冊。spring
@Bean:用來代替 XML 配置文件裏面的 <bean ...> 配置。springboot
線程池建立以後,線程池中的線程數爲0,當任務過來就會建立一個線程去執行,直到線程數達到corePoolSize 以後,就會被到達的任務放在隊列中。換句更精煉的話:corePoolSize 表示容許線程池中容許同時運行的最大線程數。app
若是執行了線程池的prestartAllCoreThreads()方法,線程池會提早建立並啓動全部核心線程。異步
spring線程池會對上述的參數進行包裝,可能你看到的真正配置時的名稱不同,但實際的做用是同樣的。ide
這是博主本身寫的一個關於Springboot線程池的配置類,參考了一些文章的規範,能夠直接使用。性能
@EnableAsync @Configuration public class LogThreadPoolConfig { @Bean(name = "logThreadPool") public ThreadPoolTaskExecutor LogThreadPoolTask() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); LogThreadPoolProperties properties = this.logThreadPoolProperties(); executor.setCorePoolSize(properties.getCorePoolSize()); executor.setMaxPoolSize(properties.getMaxPoolSize()); executor.setQueueCapacity(properties.getQueueCapacity()); executor.setKeepAliveSeconds(properties.getKeepAliveSeconds()); executor.setThreadNamePrefix(properties.getThreadName()); switch (properties.getRejectedExecutionHandler()) { case "abortPolicy": executor.setRejectedExecutionHandler(new AbortPolicy()); break; case "callerRunsPolicy": executor.setRejectedExecutionHandler(new CallerRunsPolicy()); break; case "discardOldestPolicy": executor.setRejectedExecutionHandler(new DiscardOldestPolicy()); break; case "discardPolicy": executor.setRejectedExecutionHandler(new DiscardOldestPolicy()); break; default: executor.setRejectedExecutionHandler(new CallerRunsPolicy()); break; } executor.initialize(); return executor; } @Bean @ConfigurationProperties(prefix = "threadpool.log") public LogThreadPoolProperties logThreadPoolProperties() { return new LogThreadPoolProperties(); } //@Getter lombok提供的getset方法生成註解 //@Setter @Configuration public static class LogThreadPoolProperties { /** * 線程前綴名 */ private String threadName; /** * 核心線程池大小 */ private int corePoolSize; /** * 最大線程數 */ private int maxPoolSize; /** * 隊列大小 */ private int queueCapacity; /** * 線程池維護空閒線程存在時間 */ private int keepAliveSeconds; /** * 拒絕策略 */ private String rejectedExecutionHandler; } }
這樣就能夠在yml文件中配置參數了:this
threadpool: log: threadName: ThreadPool-log- # 線程池前綴名 corePoolSize: 8 # 核心線程池數:IO型推薦設置爲cpu核心數*2;cpu型推薦設置爲cpu數+1 maxPoolSize: 16 # 最大線程池數 queueCapacity: 1000 # 線程池阻塞隊列容量 keepAliveSeconds: 60 # 容許線程空閒時間 # 拒絕策略 abortPolicy callerRunsPolicy discardOldestPolicy discardPolicy rejectedExecutionHandler: callerRunsPolicy
Spring提供了註解方式來方便咱們使用線程池,只須要在要異步處理的方法上加 @Async("你配置的線程池名字")就能夠了,注意這個類須要被spring掃描並歸入管理,因此要加@Service、@Component等註解。線程
@Service public class ServiceImpl implements Service { @Override @Async("logThreadPool") public void addOperationLog(BaseLog baseLog) { //你要異步執行的邏輯 } }
具體的異步效果能夠自測一下
springboot給咱們提供了一個線程池的實現,它的底層是由咱們傳統線程池ThreadPoolExecutor來實現的。
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor { private final Object poolSizeMonitor = new Object(); private int corePoolSize = 1; private int maxPoolSize = 2147483647; private int keepAliveSeconds = 60; private int queueCapacity = 2147483647; private boolean allowCoreThreadTimeOut = false; private TaskDecorator taskDecorator; /** * 在這能夠看到,其底層封裝了咱們熟悉的threadPoolExecutor,這是JDK提供給咱們的線程池實現 */ private ThreadPoolExecutor threadPoolExecutor; public ThreadPoolTaskExecutor() { } /** * 這些都是些get/set */ public void setCorePoolSize(int corePoolSize) { Object var2 = this.poolSizeMonitor; synchronized(this.poolSizeMonitor) { this.corePoolSize = corePoolSize; if (this.threadPoolExecutor != null) { this.threadPoolExecutor.setCorePoolSize(corePoolSize); } } } public int getCorePoolSize() { Object var1 = this.poolSizeMonitor; synchronized(this.poolSizeMonitor) { return this.corePoolSize; } } public void setMaxPoolSize(int maxPoolSize) { Object var2 = this.poolSizeMonitor; synchronized(this.poolSizeMonitor) { this.maxPoolSize = maxPoolSize; if (this.threadPoolExecutor != null) { this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize); } } } public int getMaxPoolSize() { Object var1 = this.poolSizeMonitor; synchronized(this.poolSizeMonitor) { return this.maxPoolSize; } } public void setKeepAliveSeconds(int keepAliveSeconds) { Object var2 = this.poolSizeMonitor; synchronized(this.poolSizeMonitor) { this.keepAliveSeconds = keepAliveSeconds; if (this.threadPoolExecutor != null) { this.threadPoolExecutor.setKeepAliveTime((long)keepAliveSeconds, TimeUnit.SECONDS); } } } public int getKeepAliveSeconds() { Object var1 = this.poolSizeMonitor; synchronized(this.poolSizeMonitor) { return this.keepAliveSeconds; } } public void setQueueCapacity(int queueCapacity) { this.queueCapacity = queueCapacity; } public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) { this.allowCoreThreadTimeOut = allowCoreThreadTimeOut; } public void setTaskDecorator(TaskDecorator taskDecorator) { this.taskDecorator = taskDecorator; } /** * 這是初始化方法,能夠在這裏把JDK提供的ThreadPoolExecutor初始化了 */ protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { BlockingQueue<Runnable> queue = this.createQueue(this.queueCapacity); ThreadPoolExecutor executor; if (this.taskDecorator != null) { executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) { public void execute(Runnable command) { super.execute(ThreadPoolTaskExecutor.this.taskDecorator.decorate(command)); } }; } else { executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler); } if (this.allowCoreThreadTimeOut) { executor.allowCoreThreadTimeOut(true); } this.threadPoolExecutor = executor; return executor; } protected BlockingQueue<Runnable> createQueue(int queueCapacity) { return (BlockingQueue)(queueCapacity > 0 ? new LinkedBlockingQueue(queueCapacity) : new SynchronousQueue()); } public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException { Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized"); return this.threadPoolExecutor; } public int getPoolSize() { return this.threadPoolExecutor == null ? this.corePoolSize : this.threadPoolExecutor.getPoolSize(); } public int getActiveCount() { return this.threadPoolExecutor == null ? 0 : this.threadPoolExecutor.getActiveCount(); } public void execute(Runnable task) { ThreadPoolExecutor executor = this.getThreadPoolExecutor(); try { executor.execute(task); } catch (RejectedExecutionException var4) { throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4); } } public void execute(Runnable task, long startTimeout) { this.execute(task); } public Future<?> submit(Runnable task) { ThreadPoolExecutor executor = this.getThreadPoolExecutor(); try { return executor.submit(task); } catch (RejectedExecutionException var4) { throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4); } } public <T> Future<T> submit(Callable<T> task) { ThreadPoolExecutor executor = this.getThreadPoolExecutor(); try { return executor.submit(task); } catch (RejectedExecutionException var4) { throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4); } } //這些都是些Spring對線程池的功能加強,通常用不到 public ListenableFuture<?> submitListenable(Runnable task) { ThreadPoolExecutor executor = this.getThreadPoolExecutor(); try { ListenableFutureTask<Object> future = new ListenableFutureTask(task, (Object)null); executor.execute(future); return future; } catch (RejectedExecutionException var4) { throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4); } } public <T> ListenableFuture<T> submitListenable(Callable<T> task) { ThreadPoolExecutor executor = this.getThreadPoolExecutor(); try { ListenableFutureTask<T> future = new ListenableFutureTask(task); executor.execute(future); return future; } catch (RejectedExecutionException var4) { throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4); } } public boolean prefersShortLivedTasks() { return true; } }
ThreadPoolTaskExecutor 繼承了 ExecutorConfigurationSupport,其實它主要是完成線程池的初始化的:
public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory implements BeanNameAware, InitializingBean, DisposableBean { protected final Log logger = LogFactory.getLog(this.getClass()); private ThreadFactory threadFactory = this; private boolean threadNamePrefixSet = false; private RejectedExecutionHandler rejectedExecutionHandler = new AbortPolicy(); private boolean waitForTasksToCompleteOnShutdown = false; private int awaitTerminationSeconds = 0; private String beanName; private ExecutorService executor; public ExecutorConfigurationSupport() { } public void setThreadFactory(ThreadFactory threadFactory) { this.threadFactory = (ThreadFactory)(threadFactory != null ? threadFactory : this); } public void setThreadNamePrefix(String threadNamePrefix) { super.setThreadNamePrefix(threadNamePrefix); this.threadNamePrefixSet = true; } public void setRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler) { this.rejectedExecutionHandler = (RejectedExecutionHandler)(rejectedExecutionHandler != null ? rejectedExecutionHandler : new AbortPolicy()); } public void setWaitForTasksToCompleteOnShutdown(boolean waitForJobsToCompleteOnShutdown) { this.waitForTasksToCompleteOnShutdown = waitForJobsToCompleteOnShutdown; } public void setAwaitTerminationSeconds(int awaitTerminationSeconds) { this.awaitTerminationSeconds = awaitTerminationSeconds; } public void setBeanName(String name) { this.beanName = name; } /** * 這裏就是在bean初始化完後調用線程池的初始化方法生成線程池實例 * 並被Spring容器管理 */ public void afterPropertiesSet() { this.initialize(); } public void initialize() { if (this.logger.isInfoEnabled()) { this.logger.info("Initializing ExecutorService " + (this.beanName != null ? " '" + this.beanName + "'" : "")); } if (!this.threadNamePrefixSet && this.beanName != null) { this.setThreadNamePrefix(this.beanName + "-"); } this.executor = this.initializeExecutor(this.threadFactory, this.rejectedExecutionHandler); } protected abstract ExecutorService initializeExecutor(ThreadFactory var1, RejectedExecutionHandler var2); public void destroy() { this.shutdown(); } public void shutdown() { if (this.logger.isInfoEnabled()) { this.logger.info("Shutting down ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : "")); } if (this.waitForTasksToCompleteOnShutdown) { this.executor.shutdown(); } else { this.executor.shutdownNow(); } this.awaitTerminationIfNecessary(); } private void awaitTerminationIfNecessary() { if (this.awaitTerminationSeconds > 0) { try { if (!this.executor.awaitTermination((long)this.awaitTerminationSeconds, TimeUnit.SECONDS) && this.logger.isWarnEnabled()) { this.logger.warn("Timed out while waiting for executor" + (this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate"); } } catch (InterruptedException var2) { if (this.logger.isWarnEnabled()) { this.logger.warn("Interrupted while waiting for executor" + (this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate"); } Thread.currentThread().interrupt(); } } } }
上述好多的參數其實都是JDK線程池須要的,具體他們的功能能夠看線程池源碼來了解它的做用。線程池源碼解析