簡單的描述一下業務場景,項目裏面有一個分佈式定時job ,按期去扒取數據,那麼有兩層循環,第一層是大概1000條數據 ,而後第二層循環每一個數據下面大概20個子數據,而後經過多線程的方式去扒取數據入庫。這就是一個簡單的業務背景。這個對剛入行的小白寫代碼得注意了!程序員
做爲程序員的我也是第一次遇到這個問題,雖然這個問題解決很簡單,可是形成的影響很大,我以爲仍是有必要作一個小總結,小夥伴們之後寫相似代碼的時候就會有注意,這個問題的形成是我一個同事沒有理解透線程池,致使的一個很大的問題。那麼先說問題以前先扒拉扒拉線程池。數據庫
首先我先說明一點在企業中通常都是自定義線程池,不多使用jdk 給咱們提供的幾種線程池方法,這樣是爲了作到一個可控制。bash
這裏幫你們只是一次簡單的回顧吧,具體線程池網上已經一大片一大片的文章了。多線程
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
private static ThreadPoolExecutor poolExecutor = new
ThreadPoolExecutor(3,
30,
1,
TimeUnit.MINUTES,
new ArrayBlockingQueue(1000),
new MyRejectedExecutionHandler(MAX_QUEUE_SIZE));
複製代碼
那麼說到線程池無非就理解這幾個參數。併發
流程異步
那麼上面也是整個線程池的核心流程作了一個描述。分佈式
上面已經描述了線程池的流程和原理,下面自定義線程池直接貼代碼了,就不作過多的闡述了。ide
// 定義一個線程池類
public class MyWorkerThreadPool {
private static final int MAX_QUEUE_SIZE = 1000;
private static ThreadPoolExecutor poolExecutor = new
ThreadPoolExecutor(3,
30,
1,
TimeUnit.MINUTES,
new ArrayBlockingQueue(MAX_QUEUE_SIZE),
new MyRejectedExecutionHandler(MAX_QUEUE_SIZE));
public static void submitTak(Runnable run) {
poolExecutor.submit(run);
}
public static void shutdown() {
poolExecutor.shutdown();
}
}
// 定義一個拒絕策略
public class MyRejectedExecutionHandler implements RejectedExecutionHandler{
private final Log logger = LogFactory.getLog(this.getClass());
private int maxQueueSize;
public MyRejectedExecutionHandler(int maxQueueSize){
this.maxQueueSize=maxQueueSize;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("提交任務失敗了" +
"當前隊列最大長度" +
"maxQueueSize="+maxQueueSize+
",maximumPoolSize="+executor.getMaximumPoolSize());
if(executor.getQueue().size()<maxQueueSize){
executor.submit(r);
}else{
try {
Thread.sleep(3000);
executor.submit(r);
}catch (Exception e){
//此異常忽略
executor.submit(r);
}
}
}
}
複製代碼
那麼接下來就是重點了,這裏只貼一部分僞代碼,前面已經說了這是一個分佈式定時job,這裏是我精簡了同事的代碼提煉出來的。this
//模擬場景
for(int i= 0;i< 1000;i++){
for(int j = 0;j<20;j++){
MyWorkerThreadPool.submitTak(()->{
// 真實業務場景這裏很是耗時,
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
執行結果:
提交任務失敗了當前隊列最大長度maxQueueSize=1000,maximumPoolSize=30
提交任務失敗了當前隊列最大長度maxQueueSize=1000,maximumPoolSize=30
提交任務失敗了當前隊列最大長度maxQueueSize=1000,maximumPoolSize=30
提交任務失敗了當前隊列最大長度maxQueueSize=1000,maximumPoolSize=30
提交任務失敗了當前隊列最大長度maxQueueSize=1000,maximumPoolSize=30
複製代碼
這裏 第一層模擬了1000條數據,第二層循環30條數據,同事的代碼,致使同一時間咱們自定義的線程隊列爆滿,2000*30 這個是要開多少個線程啊。細細想下是否是很恐怖,這麼使用會致使什麼後果呢,當前業務又不少都被拒絕掉沒有執行,另外當線程池爆滿後,咱們項目的其它功能執行異步方法也會被拒絕掉,結果可想而知。spa
那麼如何解決了,其實很簡單,咱們跑這個比較耗時的任務咱們能夠指定10個線程去跑就是了,這樣就不會影響到其它的功能業務。 我這裏簡單的使用了一個計數器,好比超過了10個線程則阻塞等待一會,跑完一個線程則減一,直接上代碼
public class TestMain {
public static void main(String[] args) throws Exception{
//模擬場景,這是一個分佈式定時任務,因此不會存在併發同時執行的問題
AtomicInteger threadNum = new AtomicInteger(0);
// 模擬當前第一層有1000數據
for(int i= 0;i< 1000;i++){
// 模擬每條線路有20條子數據
for(int j = 0;j<20;j++){
// 多線程拉去爬取網上的數據彙總到數據庫
// 一次最多開啓10個線程取執行耗時操做,
while (threadNum.get() > 10){
// 能夠小睡一會再看是否有資格執行
Thread.sleep(500);
}
// 小增長1
threadNum.incrementAndGet();
int tempI = i;
int tempJ = j;
MyWorkerThreadPool.submitTak(()->{
// 真實業務場景這裏很是耗時,
try {
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName()+"執行完第一層數據"+ tempI +"第二層:"+tempJ);
// 執行完減小一個1
threadNum.decrementAndGet();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
執行結果:
pool-2-thread-2執行完第一層數據0第二層:1
pool-2-thread-1執行完第一層數據0第二層:0
pool-2-thread-3執行完第一層數據0第二層:2
pool-2-thread-2執行完第一層數據0第二層:3
pool-2-thread-3執行完第一層數據0第二層:5
pool-2-thread-1執行完第一層數據0第二層:4
pool-2-thread-3執行完第一層數據0第二層:7
pool-2-thread-1執行完第一層數據0第二層:8
pool-2-thread-2執行完第一層數據0第二層:6
pool-2-thread-1執行完第一層數據0第二層:10
pool-2-thread-3執行完第一層數據0第二層:9
pool-2-thread-2執行完第一層數據0第二層:11
pool-2-thread-3執行完第一層數據0第二層:13
pool-2-thread-2執行完第一層數據0第二層:14
pool-2-thread-1執行完第一層數據0第二層:12
複製代碼
其實咱們開發中又不少小細節,只是咱們有時候沒有注意或對其原理不清楚,有時候寫出來的代碼就會帶來比較糟糕的後果。那麼今天這篇就到這裏!