Java多線程設計模式(4)線程池模式

前序:
java

   Thread-Per-Message Pattern,是一種對於每一個命令或請求,都分配一個線程,由這個線程執行工做。它將「委託消息的一端」和「執行消息的一端」用兩個不一樣的線程來實現。該線程模式主要包括三個部分:編程

   1,Request參與者(委託人),也就是消息發送端或者命令請求端設計模式

   2,Host參與者,接受消息的請求,負責爲每一個消息分配一個工做線程。數組

   3,Worker參與者,具體執行Request參與者的任務的線程,由Host參與者來啓動。緩存

   因爲常規調用一個方法後,必須等待該方法徹底執行完畢後才能繼續執行下一步操做,而利用線程後,就沒必要等待具體任務執行完畢,就能夠立刻返回繼續執行下一步操做。安全

背景:網絡

   因爲在Thread-Per-Message Pattern中對於每個請求都會生成啓動一個線程,而線程的啓動是很花費時間的工做,因此鑑於此,提出了Worker Thread,重複利用已經啓動的線程。數據結構

線程池:多線程

   Worker Thread,也稱爲工人線程或背景線程,不過通常都稱爲線程池。該模式主要在於,事先啓動必定數目的工做線程。當沒有請求工做的時候,全部的工人線程都會等待新的請求過來,一旦有工做到達,就立刻從線程池中喚醒某個線程來執行任務,執行完畢後繼續在線程池中等待任務池的工做請求的到達。併發

   任務池:主要是存儲接受請求的集合,利用它能夠緩衝接受到的請求,能夠設置大小來表示同時可以接受最大請求數目。這個任務池主要是供線程池來訪問。

   線程池:這個是工做線程所在的集合,能夠經過設置它的大小來提供併發處理的工做量。對於線程池的大小,能夠事先生成必定數目的線程,根據實際狀況來動態增長或者減小線程數目。線程池的大小不是越大越好,線程的切換也會耗時的。

   存放池的數據結構,能夠用數組也能夠利用集合,在集合類中通常使用Vector,這個是線程安全的。

  Worker Thread的全部參與者:

   1,Client參與者,發送Request的參與者

   2,Channel參與者,負責緩存Request的請求,初始化啓動線程,分配工做線程

   3,Worker參與者,具體執行Request的工做線程

   4,Request參與者

注意:將在Worker線程內部等待任務池非空的方式稱爲正向等待

     將在Channel線程提供Worker線程來判斷任務池非空的方式稱爲反向等待

線程池實例1:

   利用同步方法來實現,使用數組來做爲任務池的存放數據結構。在Channel有緩存請求方法和處理請求方法,利用生成者與消費者模式來處理存儲請求,利用反向等待來判斷任務池的非空狀態。

Channel參與者:

package whut.threadpool;
//用到了生產者與消費者模式
//生成線程池,接受客戶端線程的請求,找到一個工做線程分配該客戶端請求
public class Channel {
    private static final int MAX_REQUEST = 100;// 併發數目,就是同時能夠接受多少個客戶端請求
    //利用數組來存放請求,每次從數組末尾添加請求,從開頭移除請求來處理
    private final Request[] requestQueue;// 存儲接受客戶線程的數目
    private int tail;//下一次存放Request的位置
    private int head;//下一次獲取Request的位置
    private int count;// 當前request數量
    private final WorkerThread[] threadPool;// 存儲線程池中的工做線程
    // 運用數組來存儲
    public Channel(int threads) {
        this.requestQueue = new Request[MAX_REQUEST];
        this.head = 0;
        this.head = 0;
        this.count = 0;
        threadPool = new WorkerThread[threads];
        // 啓動工做線程
        for (int i = 0; i < threadPool.length; i++) {
            threadPool[i] = new WorkerThread("Worker-" + i, this);
        }
    }
    public void startWorkers() {
        for (int i = 0; i < threadPool.length; i++) {
            threadPool[i].start();
        }
    }
    // 接受客戶端請求線程
    public synchronized void putRequest(Request request) {
        // 當Request的數量大於或等於同時接受的數目時候,要等待
        while (count >= requestQueue.length)
            try {
                wait();
            } catch (InterruptedException e) {
            }
        requestQueue[tail] = request;
        tail = (tail + 1) % requestQueue.length;
        count++;
        notifyAll();
    }
    // 處理客戶端請求線程
    public synchronized Request takeRequest() {
        while (count <= 0)
            try {
                wait();
            } catch (InterruptedException e) {
            }
        Request request = requestQueue[head];
        head = (head + 1) % requestQueue.length;
        count--;
        notifyAll();
        return request;
    }
}

客戶端請求線程

package whut.threadpool;
import java.util.Random;
//向Channel發送Request請求的
public class ClientThread extends Thread{
    private final Channel channel;
    private static final Random random=new Random();
                                                              
    public ClientThread(String name,Channel channel)
    {
        super(name);
        this.channel=channel;
    }
    public void run()
    {
        try{
            for(int i=0;true;i++)
            {
                Request request=new Request(getName(),i);
                channel.putRequest(request);
                Thread.sleep(random.nextInt(1000));
            }
        }catch(InterruptedException e)
        {
        }
    }
}

工做線程:

package whut.threadpool;
//具體工做線程
public class WorkerThread extends Thread{
                                                     
    private final Channel channel;
    public WorkerThread(String name,Channel channel)
    {
      super(name);
      this.channel=channel;
    }
                                                     
    public void run()
    {
        while(true)
        {
            Request request=channel.takeRequest();
            request.execute();
        }
    }
}

線程池實例2:

   利用同步塊來處理,利用Vector來存儲客戶端請求。在Channel有緩存請求方法和處理請求方法,利用生成者與消費者模式來處理存儲請求,利用正向等待來判斷任務池的非空狀態。

   這種實例,能夠借鑑到網絡ServerSocket處理用戶請求的模式中,有很好的擴展性與實用性。

   利用Vector來存儲,依舊是每次集合的最後一個位置添加請求,從開始位置移除請求來處理

Channel參與者:

package whut.threadpool2;
import java.util.Vector;
/*
 * 這個主要的做用以下
 * 0,緩衝客戶請求線程(利用生產者與消費者模式)
 * 1,存儲客戶端請求的線程
 * 2,初始化啓動必定數量的線程
 * 3,主動來喚醒處於任務池中wait set的一些線程來執行任務
 */
public class Channel {
    public final static int THREAD_COUNT=4;
    public static void main(String[] args) {
      //定義兩個集合,一個是存放客戶端請求的,利用Vector,
      //一個是存儲線程的,就是線程池中的線程數目
                            
      //Vector是線程安全的,它實現了Collection和List
      //Vector 類能夠實現可增加的對象數組。與數組同樣,
      //它包含能夠使用整數索引進行訪問的組件。但Vector 的大小能夠根據須要增大或縮小,
      //以適應建立 Vector 後進行添加或移除項的操做。
      //Collection中主要包括了list相關的集合以及set相關的集合,Queue相關的集合
      //注意:Map不是Collection的子類,都是java.util.*下的同級包
      Vector pool=new Vector();
      //工做線程,初始分配必定限額的數目
      WorkerThread[] workers=new WorkerThread[THREAD_COUNT];
                         
      //初始化啓動工做線程
      for(int i=0;i<workers.length;i++)
      {
          workers[i]=new WorkerThread(pool);
          workers[i].start();
      }
                          
      //接受新的任務,而且將其存儲在Vector中
      Object task=new Object();//模擬的任務實體類
      //此處省略具體工做
      //在網絡編程中,這裏就是利用ServerSocket來利用ServerSocket.accept接受一個Socket從而喚醒線程
                          
      //當有具體的請求達到
      synchronized(pool)
      {
          pool.add(pool.size(), task);
          pool.notifyAll();//通知全部在pool wait set中等待的線程,喚醒一個線程進行處理
      }
      //注意上面這步驟添加任務池請求,以及通知線程,均可以放在工做線程內部實現
      //只須要定義該方法爲static,在方法體用同步塊,且共享的線程池也是static便可
                          
      //下面這步,能夠有能夠沒有根據實際狀況
      //取消等待的線程
      for(int i=0;i<workers.length;i++)
      {
          workers[i].interrupt();
      }
    }
}

工做線程:

package whut.threadpool2;
import java.util.List;
public class WorkerThread extends Thread {
    private List pool;//任務請求池
    private static int fileCompressed=0;//全部實例共享的
                    
    public WorkerThread(List pool)
    {
          this.pool=pool;  
    }
                    
    //利用靜態synchronized來做爲整個synchronized類方法,僅能同時一個操做該類的這個方法
    private static synchronized void incrementFilesCompressed()
    {
        fileCompressed++;
    }
                    
    public void run()
    {
        //保證無限循環等待中
        while(true)
        {
            //共享互斥來訪問pool變量
            synchronized(pool)
            {
                //利用多線程設計模式中的
                //Guarded Suspension Pattern,警惕條件爲pool不爲空,不然無限的等待中
                while(pool.isEmpty())
                {
                    try{
                        pool.wait();//進入pool的wait set中等待着,釋放了pool的鎖
                    }catch(InterruptedException e)
                    {
                    }
                }
                //當線程被喚醒,須要從新獲取pool的鎖,
                //再次繼續執行synchronized代碼塊中其他的工做
                //當不爲空的時候,繼續再判斷是否爲空,若是不爲空,則跳出循環
                //必須先從任務池中移除一個任務來執行,統一用從末尾添加,從開始處移除
                                
                pool.remove(0);//獲取任務池中的任務,而且要進行轉換
            }
            //下面是線程所要處理的具體工做
        }
    }
}
相關文章
相關標籤/搜索