前序:
java
Thread-Per-Message Pattern,是一種對於每一個命令或請求,都分配一個線程,由這個線程執行工做。它將「委託消息的一端」和「執行消息的一端」用兩個不一樣的線程來實現。該線程模式主要包括三個部分:編程
1,Request參與者(委託人),也就是消息發送端或者命令請求端設計模式
2,Host參與者,接受消息的請求,負責爲每一個消息分配一個工做線程。數組
3,Worker參與者,具體執行Request參與者的任務的線程,由Host參與者來啓動。緩存
因爲常規調用一個方法後,必須等待該方法徹底執行完畢後才能繼續執行下一步操做,而利用線程後,就沒必要等待具體任務執行完畢,就能夠立刻返回繼續執行下一步操做。安全
背景:網絡
因爲在Thread-Per-Message Pattern中對於每個請求都會生成啓動一個線程,而線程的啓動是很花費時間的工做,因此鑑於此,提出了Worker Thread,重複利用已經啓動的線程。數據結構
線程池:多線程
Worker Thread,也稱爲工人線程或背景線程,不過通常都稱爲線程池。該模式主要在於,事先啓動必定數目的工做線程。當沒有請求工做的時候,全部的工人線程都會等待新的請求過來,一旦有工做到達,就立刻從線程池中喚醒某個線程來執行任務,執行完畢後繼續在線程池中等待任務池的工做請求的到達。併發
任務池:主要是存儲接受請求的集合,利用它能夠緩衝接受到的請求,能夠設置大小來表示同時可以接受最大請求數目。這個任務池主要是供線程池來訪問。
線程池:這個是工做線程所在的集合,能夠經過設置它的大小來提供併發處理的工做量。對於線程池的大小,能夠事先生成必定數目的線程,根據實際狀況來動態增長或者減小線程數目。線程池的大小不是越大越好,線程的切換也會耗時的。
存放池的數據結構,能夠用數組也能夠利用集合,在集合類中通常使用Vector,這個是線程安全的。
Worker Thread的全部參與者:
1,Client參與者,發送Request的參與者
2,Channel參與者,負責緩存Request的請求,初始化啓動線程,分配工做線程
3,Worker參與者,具體執行Request的工做線程
4,Request參與者
注意:將在Worker線程內部等待任務池非空的方式稱爲正向等待。
將在Channel線程提供Worker線程來判斷任務池非空的方式稱爲反向等待。
線程池實例1:
利用同步方法來實現,使用數組來做爲任務池的存放數據結構。在Channel有緩存請求方法和處理請求方法,利用生成者與消費者模式來處理存儲請求,利用反向等待來判斷任務池的非空狀態。
Channel參與者:
package whut.threadpool; //用到了生產者與消費者模式 //生成線程池,接受客戶端線程的請求,找到一個工做線程分配該客戶端請求 public class Channel { private static final int MAX_REQUEST = 100;// 併發數目,就是同時能夠接受多少個客戶端請求 //利用數組來存放請求,每次從數組末尾添加請求,從開頭移除請求來處理 private final Request[] requestQueue;// 存儲接受客戶線程的數目 private int tail;//下一次存放Request的位置 private int head;//下一次獲取Request的位置 private int count;// 當前request數量 private final WorkerThread[] threadPool;// 存儲線程池中的工做線程 // 運用數組來存儲 public Channel(int threads) { this.requestQueue = new Request[MAX_REQUEST]; this.head = 0; this.head = 0; this.count = 0; threadPool = new WorkerThread[threads]; // 啓動工做線程 for (int i = 0; i < threadPool.length; i++) { threadPool[i] = new WorkerThread("Worker-" + i, this); } } public void startWorkers() { for (int i = 0; i < threadPool.length; i++) { threadPool[i].start(); } } // 接受客戶端請求線程 public synchronized void putRequest(Request request) { // 當Request的數量大於或等於同時接受的數目時候,要等待 while (count >= requestQueue.length) try { wait(); } catch (InterruptedException e) { } requestQueue[tail] = request; tail = (tail + 1) % requestQueue.length; count++; notifyAll(); } // 處理客戶端請求線程 public synchronized Request takeRequest() { while (count <= 0) try { wait(); } catch (InterruptedException e) { } Request request = requestQueue[head]; head = (head + 1) % requestQueue.length; count--; notifyAll(); return request; } }
客戶端請求線程
package whut.threadpool; import java.util.Random; //向Channel發送Request請求的 public class ClientThread extends Thread{ private final Channel channel; private static final Random random=new Random(); public ClientThread(String name,Channel channel) { super(name); this.channel=channel; } public void run() { try{ for(int i=0;true;i++) { Request request=new Request(getName(),i); channel.putRequest(request); Thread.sleep(random.nextInt(1000)); } }catch(InterruptedException e) { } } }
工做線程:
package whut.threadpool; //具體工做線程 public class WorkerThread extends Thread{ private final Channel channel; public WorkerThread(String name,Channel channel) { super(name); this.channel=channel; } public void run() { while(true) { Request request=channel.takeRequest(); request.execute(); } } }
線程池實例2:
利用同步塊來處理,利用Vector來存儲客戶端請求。在Channel有緩存請求方法和處理請求方法,利用生成者與消費者模式來處理存儲請求,利用正向等待來判斷任務池的非空狀態。
這種實例,能夠借鑑到網絡ServerSocket處理用戶請求的模式中,有很好的擴展性與實用性。
利用Vector來存儲,依舊是每次集合的最後一個位置添加請求,從開始位置移除請求來處理。
Channel參與者:
package whut.threadpool2; import java.util.Vector; /* * 這個主要的做用以下 * 0,緩衝客戶請求線程(利用生產者與消費者模式) * 1,存儲客戶端請求的線程 * 2,初始化啓動必定數量的線程 * 3,主動來喚醒處於任務池中wait set的一些線程來執行任務 */ public class Channel { public final static int THREAD_COUNT=4; public static void main(String[] args) { //定義兩個集合,一個是存放客戶端請求的,利用Vector, //一個是存儲線程的,就是線程池中的線程數目 //Vector是線程安全的,它實現了Collection和List //Vector 類能夠實現可增加的對象數組。與數組同樣, //它包含能夠使用整數索引進行訪問的組件。但Vector 的大小能夠根據須要增大或縮小, //以適應建立 Vector 後進行添加或移除項的操做。 //Collection中主要包括了list相關的集合以及set相關的集合,Queue相關的集合 //注意:Map不是Collection的子類,都是java.util.*下的同級包 Vector pool=new Vector(); //工做線程,初始分配必定限額的數目 WorkerThread[] workers=new WorkerThread[THREAD_COUNT]; //初始化啓動工做線程 for(int i=0;i<workers.length;i++) { workers[i]=new WorkerThread(pool); workers[i].start(); } //接受新的任務,而且將其存儲在Vector中 Object task=new Object();//模擬的任務實體類 //此處省略具體工做 //在網絡編程中,這裏就是利用ServerSocket來利用ServerSocket.accept接受一個Socket從而喚醒線程 //當有具體的請求達到 synchronized(pool) { pool.add(pool.size(), task); pool.notifyAll();//通知全部在pool wait set中等待的線程,喚醒一個線程進行處理 } //注意上面這步驟添加任務池請求,以及通知線程,均可以放在工做線程內部實現 //只須要定義該方法爲static,在方法體用同步塊,且共享的線程池也是static便可 //下面這步,能夠有能夠沒有根據實際狀況 //取消等待的線程 for(int i=0;i<workers.length;i++) { workers[i].interrupt(); } } }
工做線程:
package whut.threadpool2; import java.util.List; public class WorkerThread extends Thread { private List pool;//任務請求池 private static int fileCompressed=0;//全部實例共享的 public WorkerThread(List pool) { this.pool=pool; } //利用靜態synchronized來做爲整個synchronized類方法,僅能同時一個操做該類的這個方法 private static synchronized void incrementFilesCompressed() { fileCompressed++; } public void run() { //保證無限循環等待中 while(true) { //共享互斥來訪問pool變量 synchronized(pool) { //利用多線程設計模式中的 //Guarded Suspension Pattern,警惕條件爲pool不爲空,不然無限的等待中 while(pool.isEmpty()) { try{ pool.wait();//進入pool的wait set中等待着,釋放了pool的鎖 }catch(InterruptedException e) { } } //當線程被喚醒,須要從新獲取pool的鎖, //再次繼續執行synchronized代碼塊中其他的工做 //當不爲空的時候,繼續再判斷是否爲空,若是不爲空,則跳出循環 //必須先從任務池中移除一個任務來執行,統一用從末尾添加,從開始處移除 pool.remove(0);//獲取任務池中的任務,而且要進行轉換 } //下面是線程所要處理的具體工做 } } }