一個簡單的線程池實現

線程池代碼
java

import java.util.List;
import java.util.Vector;
public class ThreadPool 
{
    private static ThreadPool instance_ = null;
    //定義優先級別常數,空閒的線程按照優先級不一樣分別存放在三個vector中
    public static final int LOW_PRIORITY = 0; 
    public static final int NORMAL_PRIORITY = 1;
    public static final int HIGH_PRIORITY = 2;
    //保存空閒線程的List,或者說它是"池"
    private List<PooledThread>[] idleThreads_;  
    private boolean shutDown_ = false;
    private int threadCreationCounter_; //以建立的線程的個數
    private boolean debug_ = false;    //是否輸出調試信息
    //構造函數,由於這個類視做爲singleton實現的,所以構造函數爲私有
    private ThreadPool() 
    {       
        // 產生空閒線程.三個vector分別存放分別處在三個優先級的線程的引用
        List[] idleThreads = {new Vector(5), new Vector(5), new Vector(5)};
        idleThreads_ = idleThreads;
        threadCreationCounter_ = 0;
    }
    
    public int getCreatedThreadsCount() {
        return threadCreationCounter_;
    }
    //經過這個函數獲得線程池類的實例
    public static ThreadPool instance() {
        if (instance_ == null)
            instance_ = new ThreadPool();
        return instance_;
    }
    
    public boolean isDebug() {
        return debug_;
    }
    
    //將線程repoolingThread重新放回到池中,這個方式是同步方法。
    //這個方法會在多線程的環境中調用,設計這個方法的目的是讓工做者線程
    //在執行完target中的任務後,調用池類的repool()方法,
    //將線程自身重新放回到池中。只因此這麼作是由於線程池並不能預見到
    //工做者線程什麼時候會完成任務。參考PooledThread的相關代碼。
    protected synchronized void repool(PooledThread repoolingThread)
    {
        if (!shutDown_) 
        {
            if (debug_)
            {
                System.out.println("ThreadPool.repool() : repooling ");
            }
            switch (repoolingThread.getPriority())
            {
                case Thread.MIN_PRIORITY :
                {
                    idleThreads_[LOW_PRIORITY].add(repoolingThread);
                    break;
                }
                case Thread.NORM_PRIORITY :
                {
                    idleThreads_[NORMAL_PRIORITY].add(repoolingThread);
                    break;
                }
                case Thread.MAX_PRIORITY :
                {
                    idleThreads_[HIGH_PRIORITY].add(repoolingThread);
                    break;
                }
                default :
                    throw new IllegalStateException("Illegal priority found while repooling a Thread!");
            }
            notifyAll();//通知全部的線程
        }
        else 
        {
            if (debug_)
            {
                System.out.println("ThreadPool.repool() : Destroying incoming thread.");
            }
            repoolingThread.shutDown();//關閉線程
        }
        if (debug_) 
        {
            System.out.println("ThreadPool.recycle() : done.");
        }
    }
    
    public void setDebug(boolean newDebug) 
    {
        debug_ = newDebug;
    }
    
    //中止池中全部線程
    public synchronized void shutdown()
    {
        shutDown_ = true;
        if (debug_)
        {
            System.out.println("ThreadPool : shutting down ");
        }
        for (int prioIndex = 0; prioIndex <= HIGH_PRIORITY; prioIndex++)
        {
            List prioThreads = idleThreads_[prioIndex];
            for (int threadIndex = 0; threadIndex < prioThreads.size(); threadIndex++)
            {
                PooledThread idleThread = (PooledThread) prioThreads.get(threadIndex);
                idleThread.shutDown();
            }
        }
        notifyAll();
        if (debug_)
        {
            System.out.println("ThreadPool : shutdown done.");
        }
    }
    
    //以Runnable爲target,從池中選擇一個優先級爲priority的線程建立線程
    //並讓線程運行。
    public synchronized void start(Runnable target, int priority)
    {
        PooledThread thread = null;  //被選出來執行target的線程
        List idleList = idleThreads_[priority];
        if (idleList.size() > 0) 
        {
            //若是池中相應優先級的線程有空閒的,那麼從中取出一個
            //設置它的target,並喚醒它
            //從空閒的線程隊列中獲取
            int lastIndex = idleList.size() - 1;
            thread = (PooledThread) idleList.get(lastIndex);
            idleList.remove(lastIndex);
            thread.setTarget(target);
        }
        //池中沒有相應優先級的線程
        else 
        { 
            threadCreationCounter_++;
            // 建立新線程,
            thread = new PooledThread(target, "PooledThread #" + threadCreationCounter_, this);
            // 新線程放入池中
            switch (priority) 
            {
                case LOW_PRIORITY :
                {
                    thread.setPriority(Thread.MIN_PRIORITY);
                    break;
                }
                case NORMAL_PRIORITY :
                {
                    thread.setPriority(Thread.NORM_PRIORITY);
                    break;
                }
                case HIGH_PRIORITY :
                {
                    thread.setPriority(Thread.MAX_PRIORITY);
                    break;
                }
                default :
                {
                    thread.setPriority(Thread.NORM_PRIORITY);
                    break;
                }
            }
            //啓動這個線程
            thread.start();
        }
    }
}

工做者線程代碼:多線程

public class PooledThread extends Thread 
{
    private ThreadPool pool_;  // 池中線程須要知道本身所在的池
    private Runnable target_;   // 線程的任務
    private boolean shutDown_ = false;
    private boolean idle_ = false;//設置是否讓線程處於等待狀態
    
    private PooledThread() {
        super();
    }
    
    private PooledThread(Runnable target)
    {
        super(target); //初始化父類
    }
    
    private PooledThread(Runnable target, String name) 
    {
        super(target, name);
    }
    
    public PooledThread(Runnable target, String name, ThreadPool pool)
    {
        super(name);
        pool_ = pool;
        target_ = target;
    }
    
    private PooledThread(String name) 
    {
        super(name);//初始化父類
    }
    
    private PooledThread(ThreadGroup group, Runnable target)
    {
        super(group, target);
    }
    
    private PooledThread(ThreadGroup group, Runnable target, String name) 
    {
        super(group, target, name);
    }
    
    private PooledThread(ThreadGroup group, String name) 
    {
        super(group, name);
    }
    
    public java.lang.Runnable getTarget() 
    {
        return target_;
    }
    
    public boolean isIdle() 
    {
        return idle_;//返回當前的狀態
    }
    
    //工做者線程與一般線程不一樣之處在於run()方法的不一樣。一般的線程,
    //完成線程應該執行的代碼後,天然退出,線程結束。
    //虛擬機在線程結束後收回分配給線程的資源,線程對象被垃圾回收。]
    //而這在池化的工做者線程中是應該避免的,不然線程池就失去了意義。
    //做爲能夠被放入池中並從新利用的工做者線程,它的run()方法不該該結束,
    //隨意,在隨後能夠看到的實現中,run()方法執行完target對象的代碼後,
    //就將自身repool(),而後調用wait()方法,使本身睡眠而不是退出循環和run()。
    //這就使線程池實現的要點。
    public void run() 
    {
        // 這個循環不能結束,除非池類要求線程結束
        // 每一次循環都會執行一次池類分配給的任務target
        while (!shutDown_) 
        {  
            idle_ = false;
            if (target_ != null) 
            {
                target_.run();  // 運行target中的代碼
            }
            idle_ = true;
            try 
            {
                //線程通知池從新將本身放回到池中
                pool_.repool(this);  // 
                //進入池中後睡眠,等待被喚醒執行新的任務,
                //這裏是線程池中線程於普通線程的run()不一樣的地方。
                synchronized (this) 
                {
                    wait();
                }
            }
            catch (InterruptedException ie)
            {
            }
            idle_ = false;
        }
        //循環這裏不能結束,不然線程結束,資源被VM收回,
        //就沒法起到線程池的做用了
    }
    
    
    public synchronized void setTarget(java.lang.Runnable newTarget) 
    {//設置新的target,並喚醒睡眠中的線程
        target_ = newTarget;  // 新任務
        notifyAll();          // 喚醒睡眠的線程
    }
    
    public synchronized void shutDown()
    {
        shutDown_ = true;
        notifyAll();
    }
}

測試代碼:函數

public static void main(String[] args)
    {
        System.out.println("Testing ThreadPool ");
        System.out.println("Creating ThreadPool ");
        ThreadPool pool = ThreadPool.instance();
        pool.setDebug(true);
        class TestRunner implements Runnable 
        {
            public int count = 0;
            public void run() 
            {
                System.out.println("Testrunner sleeping 5 seconds ");
                //此方法使本線程睡眠5秒
                synchronized (this) 
                {
                    try 
                    {
                        wait(5000);//等待5秒時間
                    }
                    catch (InterruptedException ioe) 
                    {
                    }
                }
                System.out.println("Testrunner leaving  ");
                count++;
            }
        }
        System.out.println("Starting a new thread ");
        TestRunner runner = new TestRunner();
        pool.start(runner, pool.HIGH_PRIORITY);
        System.out.println("count : " + runner.count);
        System.out.println("Thread count : " + pool.getCreatedThreadsCount());
        pool.shutdown();
    }
}

結果
測試

Testing ThreadPool 
Creating ThreadPool 

Starting a new thread 

Testrunner sleeping 
5 seconds 
count : 
0
Thread count : 
1
ThreadPool : shutting down 

ThreadPool : shutdown done
.
Testrunner leaving  

ThreadPool
.repool() : Destroying incoming thread.
ThreadPool
.recycle() : done.
this

相關文章
相關標籤/搜索