用java自制簡易線程池(不依賴concurrent包)

好久以前人們爲了繼續享用並行化帶來的好處而不想使用進程,因而創造出了比進程更輕量級的線程。以linux爲例,建立一個進程須要申請新的本身的內存空間,從父進程拷貝一些數據,因此開銷是比較大的,線程(或稱輕量級進程)能夠和父進程共享內存空間,讓建立線程的開銷遠小於建立進程,因而就有了如今多線程的繁榮。
可是即使建立線程的開銷很小,但頻繁建立刪除也是很浪費性能的,因而人們又想到了線程池,線程池裏的線程只會被建立一次,用完也不會銷燬,而是在線程池裏等待被重複利用。這種尤爲適用於多而小的任務。舉個極端點的例子,若是一個小任務的執行消耗不及建立和銷燬一個線程的消耗,那麼不使用線程池時一大半的性能消耗都會是線程建立和銷燬。 最開始學java的時候,一直不理解線程池,尤爲是理解不了線程是如何被複用的,以及線程池和我建立的Thread/Runnable對象有什麼關係,今天咱們就來寫一個建議的線程池來理解這一切。(不依賴java concurrent包)
首先糾正不少人的一個誤解,咱們new一個Thread/Runnable對象的時候,並非建立出一個線程,而是建立了一個須要被線程執行的任務,當咱們調用Thread.start()方法的時候,jvm纔會幫咱們建立一個線程。線程池只是幫你執行這些任務而已,你submit的時候只是把這個任務放到某個存儲裏,等待線程池裏空閒的線程來執行,而不是建立線程。知道了這點,因此咱們首先得有個東西來存儲任務,還要支持多線程下的存取,最好還支持阻塞以免無任務時的線程空轉。
除了存儲外,咱們還須要一些線程來消費這些任務,看到這你可能就很明白的知道了這實際上是個生產者消費者模型,Java有好多種生產者消費者的實現,能夠參考我以前的博客Java生產者消費者的幾種實現方式。若是實現線程池,咱們能夠選擇使用BlockingQueue來實現。雖然java concurrent包裏已經實現了好多BlockingQueue,但爲了讓你們理解BlockingQueue作了啥,我這裏用LinkedListQueue簡單封裝了一個簡易BlockingQueue,代碼以下。
java

package me.xindoo.concurrent;

import java.util.LinkedList;
import java.util.Queue;

public class LinkedBlockingQueue<E> {
    private Object lock;
    private Queue<E> queue;
    public LinkedBlockingQueue() {
        queue = new LinkedList<>();
        lock = new Object();
    }

    public boolean add(E e) {
        synchronized (lock) {
            queue.add(e);
            lock.notify();
            return true;
        }
    }

    public  E take() {
        synchronized (lock) {
            while (queue.isEmpty()) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return queue.poll();
        }
    }

    public synchronized int size() {
        return queue.size();
    }
}
複製代碼

我也只是簡單在LinkedListQueue的基礎上對其加了synchronized,以保證它在多線程環境下的正常運轉。其次我在隊列爲空時經過wait()方法加了線程阻塞,以防止空隊列時線程空轉。既然加了阻塞也得加喚醒,每次在往隊列裏添加任務的時候,就會調用notify()來喚醒一個等待中的線程。
存儲搞定了,咱們接下來須要實現的就是消費者。消費者就是線程池裏的線程,由於任務隊列裏的任務都是實現了Runnable接口,因此咱們消費任務時均可以直接調用其run()方法來執行。當一個任務執行完成後在從隊列裏去取,知道整個線程池被shutdown。
linux

package me.xindoo.concurrent;

public class ThreadPool {
    private int coreSize;
    private boolean stop = false;
    private LinkedBlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
    private Thread[] threads;

    public ThreadPool(int coreSize)  {
        this.coreSize = coreSize;
        threads = new Thread[coreSize];
        for (int i = 0; i < coreSize; i++) {
            threads[i] = new Thread(new Worker("thread"+i));
            threads[i].start();
        }
    }

    public boolean submit(Runnable command) {
        return tasks.add(command);
    }

    public void shutdown() {
        stop = true;
    }

    private class Worker implements Runnable {
        public String name;

        public Worker(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            while (!stop) {
                Runnable command = tasks.take();
                System.out.println(name + " start run, starttime " + System.currentTimeMillis()/1000);  //方便觀察線程的執行狀況
                command.run();
                System.out.println(name + " finished, endtime " + System.currentTimeMillis()/1000);
            }
        }
    }
}
複製代碼

上面就是一個線程池的實現,是否是很簡單,在構造函數裏初始化固定數目的線程,每一個線程作的只是從隊列裏取任務,執行……一直循環。
沒錯,一個簡易的線程池就經過上面幾十行的代碼實現了,已經能夠拿去用了,甚至用在生產環境都沒啥問題(後果自負,哈哈)。固然這不是一個相似於concurrent包中功能完善、各類參數可自定義的線程池,但確確實實它實現了一個線程池的基本功能——線程的複用。 接下來寫個建議的測試代碼,若是線程池生產者消費者模型中的消費者,那這個測試代碼就是生產者,代碼以下。
多線程

package me.xindoo.concurrent;

public class Test {
    private static class Task implements Runnable {
        @Override
        public void run() {
            try {
                Thread.sleep(5000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
    public static void main(String[] args) {
        ThreadPool pool = new ThreadPool(5);

        for (int i = 0; i < 30; i++) {
            pool.submit(new Task());
        }

        System.out.println("add task finished");

        try {
            Thread.sleep(10000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        pool.shutdown();
    }
}複製代碼

執行結果以下jvm

thread0 start run, starttime 1566724202
thread2 start run, starttime 1566724202
thread1 start run, starttime 1566724202
thread3 start run, starttime 1566724202
thread4 start run, starttime 1566724202
add task finished
thread2 finished, endtime 1566724207
thread2 start run, starttime 1566724207
thread1 finished, endtime 1566724207
thread4 finished, endtime 1566724207
thread3 finished, endtime 1566724207
thread0 finished, endtime 1566724207
thread3 start run, starttime 1566724207
thread4 start run, starttime 1566724207
thread1 start run, starttime 1566724207
thread0 start run, starttime 1566724207
thread3 finished, endtime 1566724212
thread0 finished, endtime 1566724212
thread1 finished, endtime 1566724212
thread4 finished, endtime 1566724212
thread2 finished, endtime 1566724212複製代碼

測試代碼也很是簡單,建立一個5個線程,而後提交30個任務,從輸出也能夠看到的確是5個線程分批次執行完了30個任務。備註:雖然我測試代碼裏的任務很是簡單,其實複雜的任務也是能夠的。ide

總結

實時上如上文中好幾回提到,java.util.concurrent包裏已經幫你們實現了一個很健壯、功能強大的線程池,你們沒必要再去造輪子了,使用不一樣的BlockingQueue就能夠實現不一樣功能的線程池。舉個栗子,好比使用DelayedWorkQueue就能夠實現能夠按期執行的線程池了。 甚至Executors爲你們封裝了更爲簡易的線程池建立接口,可是《Alibaba Java開發手冊》強制不容許使用 Executors 去建立線程池,而是經過 ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同窗更加明確線程池的運行規則,規避資源耗盡的風險。
函數

  1. FixedThreadPool 和 SingleThreadPool: 容許的請求隊列長度爲 Integer.MAX_VALUE,可能會堆積大量的請求,從而致使 OOM。
  2. CachedThreadPool 和 ScheduledThreadPool: 容許的建立線程數量爲 Integer.MAX_VALUE,可能會建立大量的線程,從而致使 OOM。

最後說點題外話,以前咱們一個服務啓動的時候觸發了一個jdk未修復的bug bugs.java.com/bugdatabase…,致使線程池裏全部的任務都被阻塞,但其餘工做線程一直在往裏提交任務,由於咱們直接使用了Executors.FixedThreadPool 因此最後內存爆了..... 後來咱們的就結局方案就是直接使用ThreadPoolExecutor,限制了BlockingQueue的大小。
性能

版權聲明:本文爲博主原創文章,轉載請註明出處。 博客地址:xindoo.blog.csdn.net/測試

相關文章
相關標籤/搜索