在併發相關,不單單依靠以前介紹的各類鎖或者隊列操做--JAVA併發之多線程基礎(5),同時咱們也須要考慮到資源的消耗狀況(力扣上各類題目比消耗與時間。。)。這個時候咱們就引入了線程池。java
針對於你們熟悉的Executors
進行入手,咱們常常性的使用裏面的線程池。固然,根據阿里巴巴的規範手冊上來講,不建議咱們直接經過這個類去建立一個線程池,須要經過ThreadPoolExecutor
自行去建立,這樣會讓咱們懂得線程池中的線程各個時間的狀態變化,以防止線程池中的線程異常。緩存
Executors
newFixedThreadPool(int nThreads)
建立一個固定大小的線程池。newSingleThreadExecutor()
建立單一線程的線程池newCachedThreadPool()
建立一個帶緩存的線程池,裏面的線程會執行完成以後會存在一段時間,沒有執行就釋放。newScheduledThreadPool(int corePoolSize)
建立一個計劃任務的線程池前三個裏面的實現都是利用了ThreadPoolExecutor
,只不過傳入的參數是不一樣的,而後造就了不一樣的線程池,接下來就看看ThreadPoolExecutor
裏面參數的各個含義。多線程
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 複製代碼
corePoolSize
核心線程池大小maximumPoolSize
線程池最大容量keepAliveTime
線程存活時間unit
存活時間的單位workQueue
阻塞隊列存放裏面執行任務默認會幫咱們填充的兩個參數:併發
threadFactory
線程工廠,用於產生線程放入到線程池中handler
拒絕策略處理器,用於當前線程池中拒絕多餘的任務等,默認是AbortPolicy
拒絕策略線程工廠裏面就會幫咱們產生一個個線程放入到線程池中:jvm
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())//是否爲守護線程
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)//是否爲默認的優先級
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
複製代碼
在線程池中的execute(Runnable command)
方法是會去幫助咱們去執行咱們所須要的任務:ide
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();//ctl是一個包裝的原子類,裏面包含了線程的數量以及狀態
if (workerCountOf(c) < corePoolSize) {//工做線程數量小於當前的核心線程數
if (addWorker(command, true))//加入到隊列中,而且true表明使用核心線程
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//判斷線程池是否在運行,同時將任務加入到隊列中
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))//若是線程池不是運行狀態則進行拒絕任務操做
reject(command);
else if (workerCountOf(recheck) == 0)//若是線程數爲0,則開闢一個線程去執行,不採用核心線程
addWorker(null, false);
}
else if (!addWorker(command, false))//加入隊列失敗則進行線程數增長,這裏採用的線程是大於核心線程數小於最大線程數。
reject(command);
}
複製代碼
ForkJoinTask
這是一個比較特殊的線程池,能夠將一個很大的任務進行分解成爲若干個小的任務去執行。執行完成以後再將每一個任務的結果進行整合返回。底下派生出兩個抽象類:
RecursiveAction
是沒有返回值的,只是將任務劃分去執行。RecursiveTask
是有返回值的,將任務執行完成以後結果整合進行返回。post
package com.montos.lock;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class CountTask extends RecursiveTask<Long> {
private static final long serialVersionUID = 1L;
private static final int THRESHOLD = 10000;
private long start;
private long end;
public CountTask(long start, long end) {
super();
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
boolean canCompute = (end - start) < THRESHOLD;//閾值判斷
if (canCompute) {
for (long i = start; i <= end; i++) {
sum += i;
}
} else {
long step = (start + end) / 100;
ArrayList<CountTask> subTasks = new ArrayList<CountTask>();
long pos = start;
for (int i = 0; i < 100; i++) {
long lastOne = pos + step;
if (lastOne > end)
lastOne = end;
CountTask subTask = new CountTask(pos, lastOne);
pos += step + 1;
subTasks.add(subTask);
subTask.fork();//子線程進行求解
}
for (CountTask t : subTasks) {
sum += t.join();//返回全部的結果集進行求和返回
}
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
CountTask task = new CountTask(0, 200000l);
ForkJoinTask<Long> result = forkJoinPool.submit(task);
try {
Long res = result.get();
System.out.println("result is :" + res);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
複製代碼
上面是有返回值的demo,執行完以後控制檯會會出現result is :20000100000
這個返回結果。this
有些小夥伴可能會遇到兩個錯誤(修改裏面的參數值,而致使出現的問題,這裏我就遇到了!!):spa
java.util.concurrent.ExecutionException:java.lang.StackOverflowError
。緣由是由於ForkJoin
不會對堆棧進行控制,編寫代碼時注意方法遞歸不能超過jvm的內存,若是必要須要調整jvm的內存:在Eclipse中JDK的配置中加上 -XX:MaxDirectMemorySize=128
(默認是64M)。改成128後不報棧溢出,可是報下一個錯。java.lang.NoClassDefFoundError: Could not initialize class java.util.concurrent.locks.AbstractQueuedSynchronizer$Node
。這個致使的緣由是由於子任務的處理長度不平衡。咱們須要對原來的長度進行計算處理。至此
JDK
中大部分的併發類都談及到用法,對於底層代碼的描述和處理,這塊期待我以後的文章。線程