java-從java線程池來看java的阻塞隊列

一說到java的阻塞隊列,咱們就會想到在java的jdk中的那麼多的類java

1.ArrayDeque, (數組雙端隊列)
2.PriorityQueue, (優先級隊列)
3.ConcurrentLinkedQueue, (基於鏈表的併發隊列)
4.DelayQueue, (延期阻塞隊列)(阻塞隊列實現了BlockingQueue接口)
5.ArrayBlockingQueue, (基於數組的併發阻塞隊列)
6.LinkedBlockingQueue, (基於鏈表的FIFO阻塞隊列)
7.LinkedBlockingDeque, (基於鏈表的FIFO雙端阻塞隊列)
8.PriorityBlockingQueue, (帶優先級的無界阻塞隊列)
9.SynchronousQueue (併發同步阻塞隊列)bootstrap

這裏不去細說的這些東西,而是從線程池的一個異常來聊聊這個事情數組

構造一個線程池異常 - 線程池過載異常

public class ThreadPoolTest {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2,2,10000, TimeUnit.DAYS,
                new LinkedBlockingDeque<>(2));//這裏若是指定了固定的長度就表示是有界隊列了
        List<Future<?>> futures = new ArrayList<>();
        for (int a=0;a<1000;a++){
            futures.add(executor.submit(()->{
                System.out.println("thread start : "+Thread.currentThread().getName());
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("thread end : "+Thread.currentThread().getName());
            }));
        }
        for (Future<?> future: futures){
            try {
                future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
}

這段代碼若是直接運行將會拋出異常併發

OpenJDK 64-Bit Server VM warning: Sharing is only supported for boot loader classes because bootstrap classpath has been appended
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@7a07c5b4[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@3d646c37[Wrapped task = org.java.deme.ThreadPoolTest$$Lambda$14/0x0000000840067840@41cf53f9]] rejected from java.util.concurrent.ThreadPoolExecutor@5a10411[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
    at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:118)
    at org.java.deme.ThreadPoolTest.main(ThreadPoolTest.java:13)

總結一下: 其實這個問題是加入的線程數量已經超過了整個線程池能負載的最大數量了(新建線程池的時候使用了有界隊列),因此拋出了了異常app

避免線程池溢出異常 - 使用無界隊列和有界隊列

  1. BlockingQueue

這個是爲了解決java併發同步問題的,本質上是解決線程間消息同步而設計的spa

有一下幾個類:線程

1.DelayQueue, (延期阻塞隊列)(阻塞隊列實現了BlockingQueue接口) 這個隊列是無界的,而且沒有指定長度的構造方法
2.ArrayBlockingQueue, (基於數組的併發阻塞隊列) 必須設置長度
3.LinkedBlockingQueue, (基於鏈表的FIFO阻塞隊列) 沒有指定長度就是有界的反之是有界的
4.LinkedBlockingDeque, (基於鏈表的FIFO雙端阻塞隊列) 沒有指定長度就是有界的反之是有界的
5.PriorityBlockingQueue, (帶優先級的無界阻塞隊列) 這個只能傳入Comperable接口的類新,不是有界的
6.SynchronousQueue (併發同步阻塞隊列)不能指定長度,只能傳入一個值,有界設計

回過來看看上面的源碼,其實線程池在加入線程時候的邏輯是這樣的3d

構建常駐線程coreNum指定,若是超過,創建臨時線程maxNum , 還不夠,增長到隊列中code

線程池判斷能不能增長僅對列是使用的隊列的offset方法

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

若是是有界的隊列黨對列滿了天然返回false,由於添加不進去了,而後就會拋出異常

總結一下

咱們在使用java線程池的時候須要作好容量規劃,若是沒法肯定是否超出了指定的線程數量,能夠使用無界隊列,可是要注意到防止內存泄漏

相關文章
相關標籤/搜索