本身動手寫一個線程池

基礎知識:java

    http://blog.csdn.net/i_lovefish/article/details/8042708數組

一。ExcutorService和ThreadGroup的比較jvm

    在使用線程池的時候,大多數狀況咱們會使用excutors類來建立一個線程池。過程再也不贅述。但咱們發現,ExcutorService中對線程的操做只有submit、shutdown、shutdownnow等寥寥幾個方法。對線程的管理並不細緻。而ThreadGroup類中,有不少諸如獲取存活線程數量、獲取全部線程、中斷線程等諸多方法。因此咱們考慮用ThreadGroup來模擬一個線程池。
ide

二。經過繼承ThreadGroup模擬一個線程池
this

public class ThreadPoolTest extends ThreadGroup{
    //線程池是否開啓
    private boolean isAlive;
    //線程池中的任務隊列
    private LinkedList taskQueue;
    //線程池中的線程id
    private int threadID;
    //線程池的id
    private static int threadPoolId;
    public  ThreadPoolTest(int numThreads) {
        super("ThreadPool--"+numThreads);
        //設置爲守護線程,表示當線程池中的全部線程都銷燬時(interrupt方法執行),該線程池自動銷燬
        super.setDaemon(true);
        this.isAlive=true;
        this.taskQueue=new LinkedList(); 
        for(int i=0;i<numThreads;i++){
            //TODO 聲明一些線程,並讓這些線程運行
        }
    }
  }

這裏的註釋TODO的部分很重要,須要作的功能以下:.net

  1. 當任務隊列爲null時,咱們須要讓咱們聲明的線程等待s。
    線程

  2. 當任務隊列中不爲null是,咱們須要喚醒咱們的線程,拿到任務隊列中的任務去執行。
    code

綜上,咱們須要new一些個性化的線程,讓這些線程去任務隊列裏取任務線程,根據隊列的狀況執行不一樣操做。blog

方便起見,咱們直接在ThreadPoolTest裏寫一個繼承了Thread的內部類,Todo部分改成          
繼承

new PooledThread().start();

private class PooledThread extends Thread{
        public PooledThread(){
            super(ThreadPoolTest.this,"PooledThread--"+(threadID++));
            
        }
        @Override
        public void run() {
            //若是該線程沒有被終止
            while(!isInterrupted()){
                Runnable task=null;
                try {
                    task=getTask();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if(task==null){
                    //之因此沒有用wait()是由於wait和notify須要在同步代碼塊或者同步方法中執行
                    //之因此沒用task.wait()是由於task爲null
                    return;
                }
                try {
                    task.run(); 
                } catch (Throwable e) {
                    e.printStackTrace();
                    //當線程族中的線程有未被捕獲的異常時,jvm會去調用uncaughtException方法
                    //這個方法也是threadGroup的一個特點
                    uncaughtException(this, e);
                }
            }
        }
    }

上述代碼中涉及到了getTask方法,該方法應該寫在ThreadPoolTest中,用來獲取任務隊列中的線程

protected synchronized Runnable getTask() throws InterruptedException{
        while(this.taskQueue.size()==0){
            if(!this.isAlive){
                return null;
            }
            wait();//wait在這那!!說明若是隊列中沒有任務,則一直處於阻塞狀態
        }
        return (Runnable) this.taskQueue.removeFirst();
    }

下面開始實現線程池的submit方法,往隊列中添加任務,喚醒getTask

//添加新任務
    public synchronized void submit(Runnable task){
        if(!this.isAlive){
            throw new IllegalStateException();
        }
        if(task!=null){
            //將工做線程放到任務隊列的尾部
            this.taskQueue.add(task);
            //通知工做線程取任務
            notify();//getTask被喚醒啦,構造方法中new出來的線程由阻塞狀態進入可運行狀態了
        }        
    }

重點方法介紹完畢,咱們再加一些shutdown,shutdownnow方法就簡單多了

    

public synchronized void shutdownNow(){
        if(isAlive){
            this.isAlive=false;
            this.taskQueue.clear();
        }
        //終止線程池中的全部線程
        this.interrupt();
    }
    //關閉線程池,並等待線程池中的全部任務被運行完,但不能接受新的任務
    public void shutdown(){
        synchronized (this) {
            isAlive=false;
            notifyAll();
        }
        
        //將線程池中的活動線程拷貝到新建立的線程數組thread中
        Thread[] threads=new Thread[this.activeCount()];
        //將線程池中活動的線程拷貝到新建立的線程數組中
        int count=this.enumerate(threads);
        for(int i=0;i<count;i++){
            try {
                //等待全部線程執行結束
                threads[i].join();
            } catch (InterruptedException e) {
                     e.printStackTrace();
            }
        }
        System.out.println("全部線程運行完畢");
    }

所有代碼以下

package com.me.threadtest;

import java.util.LinkedList;


public class ThreadPoolTest extends ThreadGroup{
    //線程池是否開啓
    private boolean isAlive;
    //線程池中的任務隊列
    private LinkedList taskQueue;
    //線程池中的線程id
    private int threadID;
    //線程池的id
    private static int threadPoolId;
    public  ThreadPoolTest(int numThreads) {
        super("ThreadPool--"+numThreads);
        //設置爲守護線程,表示當線程池中的全部線程都銷燬時,該線程池自動銷燬
        super.setDaemon(true);
        this.isAlive=true;
        this.taskQueue=new LinkedList(); 
        for(int i=0;i<numThreads;i++){
            //TODO 聲明一些線程,並讓這些線程運行
            new PooledThread().start();
        }
    }
    //添加新任務
    public synchronized void submit(Runnable task){
        if(!this.isAlive){
            throw new IllegalStateException();
        }
        if(task!=null){
            //將工做線程放到任務隊列的尾部
            this.taskQueue.add(task);
            //通知工做線程取任務
            //TODO 不懂
            notify();
        }
        
    }
    //獲取任務
    protected synchronized Runnable getTask() throws InterruptedException{
        while(this.taskQueue.size()==0){
            if(!this.isAlive){
                return null;
            }
            wait();
        }
        return (Runnable) this.taskQueue.removeFirst();
    }
    public synchronized void shutdownNow(){
        if(isAlive){
            this.isAlive=false;
            this.taskQueue.clear();
        }
        //終止線程池中的全部線程
        this.interrupt();
    }
    //關閉線程池,並等待線程池中的全部任務被運行完,但不能接受新的任務
    public void shutdown(){
        synchronized (this) {
            isAlive=false;
            notifyAll();
        }
        
        //將線程池中的活動線程拷貝到新建立的線程數組thread中
        Thread[] threads=new Thread[this.activeCount()];
        //將線程池中活動的線程拷貝到新建立的線程數組中
        int count=this.enumerate(threads);
        for(int i=0;i<count;i++){
            try {
                //等待全部線程執行結束
                threads[i].join();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        System.out.println("全部線程運行完畢");
    }
    private class PooledThread extends Thread{
        public PooledThread(){
            super(ThreadPoolTest.this,"PooledThread--"+(threadID++));
            
        }
        @Override
        public void run() {
            //若是該線程沒有被終止
            while(!isInterrupted()){
                Runnable task=null;
                try {
                    task=getTask();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                if(task==null){
//本想經過下面註釋掉的代碼實現‘隊列爲null時等待’,但發現實現不了,因此在gettask中實現吧
//                    task.wait();
//                    wait();
                    return;
                }
                try {
                    task.run(); 
                } catch (Throwable e) {
                    e.printStackTrace();
                    //當線程族中的線程有未被捕獲的異常時,jvm會去調用uncaughtException方法
                    uncaughtException(this, e);
                }
            }
        }
    }
}

使用的時候,好比咱們設置的線程數爲5,添加到隊列中的線程數爲10,那麼同時運行的線程只有5個,ExcutorService中還有一個隊列的最大長度,這個在上述代碼中在添加一個maxSize的屬性,並改動少數代碼就能夠實現了,很是簡單,再也不贅述。

相關文章
相關標籤/搜索