線程池的建立以及CyclicBarrier與CountDownLatch的簡單使用

1、線程池的簡單建立

(1)、使用Executors進行建立

ExecutorService poo1 = Executors.newFixedThreadPool(10);

        ExecutorService pool = Executors.newSingleThreadExecutor();

這兩種線程池都是無界隊列的線程池,建立比較簡單,但可能致使堆積請求處理隊列而消耗很是大的內存。html

ExecutorService poo1 = Executors.newCachedThreadPool();

這種可緩存線程池,建立無腦暴力,能夠建立Integer.MAX_VALUE個線程,若是處理不當對內存消耗巨大。java

ExecutorService poo1 = Executors.newScheduledThreadPool(10);

這是支持定時及週期性任務執行的一種線程池,不過一樣會有上面的問題。面試

(2)、經過ThreadPoolExecutor建立線程池

ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat("這是線程%d - ").build();

        ExecutorService pool = new ThreadPoolExecutor(3, 6, 0L,
                TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(3), threadFactory, new ThreadPoolExecutor.AbortPolicy());

以上是一個簡單的實例,表示建立一個核心線程數爲3,最大線程數爲6,超時0s就進行回收,緩存隊列容量爲3,並使用AbortPolicy的拒絕策略緩存

AbortPolicy         -- 當任務添加到線程池中被拒絕時,它將拋出 RejectedExecutionException 異常。
CallerRunsPolicy    -- 當任務添加到線程池中被拒絕時,會在線程池當前正在運行的Thread線程池中處理被拒絕的任務。
DiscardOldestPolicy -- 當任務添加到線程池中被拒絕時,線程池會放棄等待隊列中最舊的未處理任務,而後將被拒絕的任務添加到等待隊列中。
DiscardPolicy       -- 當任務添加到線程池中被拒絕時,線程池將丟棄被拒絕的任務。

(3)、簡單玩一玩這個線程池

public static void main(String[] args) {
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat("這是線程%d - ").build();

        ExecutorService pool = new ThreadPoolExecutor(3, 6, 0L,
                TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(3), threadFactory, new ThreadPoolExecutor.AbortPolicy());

        for (int i = 0; i < 10; i++) {
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Integer sout = i;
            new Thread(() -> {
                pool.submit(() -> {
                    System.out.println(Thread.currentThread().getName() + sout);
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }).start();
        }
    }

打印:

這是線程0 - 0
這是線程1 - 1
這是線程2 - 2
這是線程3 - 6
這是線程4 - 7
這是線程5 - 8
Exception in thread "Thread-9" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@49984548 rejected from java.util.concurrent.ThreadPoolExecutor@475fb20b[Running, pool size = 6, active threads = 6, queued tasks = 3, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
	at com.example.demo.concurrentutil.TestCorrent.lambda$main$1(TestCorrent.java:28)
	at java.lang.Thread.run(Thread.java:745)
這是線程0 - 3
這是線程1 - 4
這是線程2 - 5

能夠看出, 最後一個線程(i = 9)被拒絕了,其餘的線程都獲得了執行,雖然咱們指定核心線程池爲3,但仍是先執行了6個線程(最大線程),等待了2秒後,再執行被放入隊列中的線程。併發

從上面打印的順序,咱們也能看出蹊蹺,它並非按照0 - 8的順序來進行打印的,這是由於新的線程進入線程池後,若是超過了核心線程池大小,並不會去直接開啓新的線程,而是先放入隊列,直到隊列滿了,纔會開啓新的線程。工具

咱們把maximumPoolSize從6改成4,會發生什麼呢?ui

這是線程0 - 0
這是線程1 - 1
這是線程2 - 2
這是線程3 - 6
Exception in thread "Thread-7" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@1abd1e5 rejected from java.util.concurrent.ThreadPoolExecutor@f6c2fa0[Running, pool size = 4, active threads = 4, queued tasks = 3, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
	at com.example.demo.concurrentutil.TestCorrent.lambda$main$1(TestCorrent.java:28)
	at java.lang.Thread.run(Thread.java:745)
Exception in thread "Thread-8" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@1a371600 rejected from java.util.concurrent.ThreadPoolExecutor@f6c2fa0[Running, pool size = 4, active threads = 4, queued tasks = 3, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
	at com.example.demo.concurrentutil.TestCorrent.lambda$main$1(TestCorrent.java:28)
	at java.lang.Thread.run(Thread.java:745)
Exception in thread "Thread-9" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@317d0b0e rejected from java.util.concurrent.ThreadPoolExecutor@f6c2fa0[Running, pool size = 4, active threads = 4, queued tasks = 3, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
	at com.example.demo.concurrentutil.TestCorrent.lambda$main$1(TestCorrent.java:28)
	at java.lang.Thread.run(Thread.java:745)
這是線程0 - 3
這是線程1 - 4
這是線程2 - 5

很顯然,只有7個線程獲得了執行。線程

再玩玩拒絕策略,好比 CallerRunsPolicycode

ExecutorService pool = new ThreadPoolExecutor(3, 4, 0L,
                TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(3), threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());

打印:

這是線程0 - 0
這是線程1 - 1
這是線程2 - 2
這是線程3 - 6
Thread-77
Thread-88
Thread-99
這是線程0 - 3
這是線程1 - 4
這是線程2 - 5

上面被拒絕的三個線程,扔回給了它的調用方執行。orm

再好比 DiscardOldestPolicy 拒絕策略

這是線程0 - 0
這是線程1 - 1
這是線程2 - 2
這是線程3 - 6
這是線程0 - 7
這是線程1 - 8
這是線程2 - 9

直接將等待隊列最末尾的任務放棄掉了,也就是先進入隊列的345任務,被線程池放棄了。

最後一種 DiscardPolicy 就不贅述了,直接放棄被拒絕的任務。

2、簡單用用 CountDownLatch

正如每一個Java文檔所描述的那樣,CountDownLatch是一個同步工具類,它容許一個或多個線程一直等待,直到其餘線程的操做執行完後再執行。在Java併發中,countdownlatch的概念是一個常見的面試題,因此必定要確保你很好的理解了它。

CountDownLatch countDownLatch = new CountDownLatch(2);

        pool.submit(new Thread(() -> {
            try {
                sout("進入線程");
                countDownLatch.await();
                sout("繼續執行");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));

        pool.submit(new Thread(() -> {
            try {
                Thread.sleep(7000);
                sout("進入線程");
                countDownLatch.await();
                sout("繼續執行");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));

        for (int i = 0; i < 2; i++) {
            pool.submit(new Thread(() -> {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
                sout("countDown");
            }));
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        pool.shutdown();

這裏有四個線程,一個 須要同步的線程 先開始執行,五秒後,countDown,七秒後,另外一個 須要同步的線程 也開始執行,十秒後,再次countDown。

這是線程0 - 進入線程
這是線程2 - countDown
這是線程1 - 進入線程
這是線程3 - countDown
這是線程0 - 繼續執行
這是線程1 - 繼續執行

在線程3進行countDown後,線程0和線程1會從等待中恢復,並執行任務。

3、簡單用用 CyclicBarrier

CyclicBarrier 的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要作的事情是,讓一組線程到達一個屏障(也能夠叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障纔會開門,全部被屏障攔截的線程纔會繼續幹活。CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每一個線程調用await方法告訴CyclicBarrier我已經到達了屏障,而後當前線程被阻塞。

CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

        pool.submit(new Thread(() -> {
            try {
                sout("進入線程");
                try {
                    cyclicBarrier.await();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                sout("繼續執行");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));

        pool.submit(new Thread(() -> {
            try {
                Thread.sleep(7000);
                sout("進入線程");
                try {
                    cyclicBarrier.await();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                sout("繼續執行");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));

CyclicBarrier的使用比CountDownLatch更加簡單,它只須要簡單使用cyclicBarrier.await();就能夠了。

相關文章
相關標籤/搜索