java 線程池 並行 執行

https://github.com/donaldlee2008/JerryMultiThread/blob/master/src/com/jerry/threadpool/ThreadPoolTest.javajava

https://github.com/donaldlee2008/JerryMultiThread/tree/master/src/com/jerry/threadpoolgit

 

 

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Random;
import java.util.concurrent.Callable;github

public class ThreadPoolTest {

public static void main(String[] args) throws InterruptedException {
ThreadPool threadPool = new ThreadPool(10); //建立一個有個3工做線程的線程池
Thread.sleep(500); //休眠500毫秒,以便讓線程池中的工做線程所有運行
//運行任務
for (int i = 0; i <=5 ; i++) { //建立6個任務
// threadPool.execute(createTask(i));
}

// ArrayList<Callable> clist =new ArrayList<Callable>();
for(int i=0;i<100;i++){
ArrayList<Integer> list =new ArrayList<Integer>();
int k=getrandom(1000000,100000);
for(int x=0;x<k ;x++){
list.add(getrandom(1000000,0));
}
threadPool.execute(createTask2(list,i));
// clist.add(c3);


}
threadPool.waitFinish(); //等待全部任務執行完畢
threadPool.closePool(); //關閉線程池dom

}
public static int getrandom(int max,int min) {
// int max=20;
//int min=10;
Random random = new Random();
int s = random.nextInt(max)%(max-min+1) + min;
// System.out.println(s);.
return s;
}ide

private static Runnable createTask(final int taskID) {
return new Runnable() {
public void run() {
// System.out.println("Task" + taskID + "開始");
System.out.println("Hello world");
// System.out.println("Task" + taskID + "結束");
}
};
}
private static Runnable createTask2(ArrayList<Integer> list,final int taskID) {
return new Runnable() {
public void run() {
// System.out.println("Task" + taskID + "開始");
//System.out.println("Hello world");
Collections.sort(list, new Comparator<Integer>() {

@Override
public int compare(Integer o1, Integer o2) {
if (o1< o2)
return -1;
else if (o1 > o2)
return 1;
else
return o1.compareTo(o2);
}
});
System.out.println("Task" + taskID + "結束"+list.size());
}
};
}

// Collections.sort(list, new Comparator<Integer>() {
//
// @Override
// public int compare(Integer o1, Integer o2) {
// if (o1< o2)
// return -1;
// else if (o1 > o2)
// return 1;
// else
// return o1.compareTo(o2);
// }
// });
}工具

 

 

 

 

 

import java.util.LinkedList;this

/**
* Java線程池工具類
* @author Jerry Wang
*
*/
public class ThreadPool extends ThreadGroup {
private boolean isClosed = false; //線程池是否關閉
private LinkedList<Runnable> workQueue; //工做隊列
private static int threadPoolID = 1; //線程池的id
public ThreadPool(int poolSize) { //poolSize 表示線程池中的工做線程的數量線程

super(threadPoolID + ""); //指定ThreadGroup的名稱
setDaemon(true); //繼承到的方法,設置是否守護線程池
workQueue = new LinkedList<Runnable>(); //建立工做隊列
for(int i = 0; i < poolSize; i++) {
new WorkThread(i).start(); //建立並啓動工做線程,線程池數量是多少就建立多少個工做線程
}
}

/** 向工做隊列中加入一個新任務,由工做線程去執行該任務*/
public synchronized void execute(Runnable task) {
if(isClosed) {
throw new IllegalStateException();
}
if(task != null) {
workQueue.add(task);//向隊列中加入一個任務
notify(); //喚醒一個正在getTask()方法中待任務的工做線程
}
}

/** 從工做隊列中取出一個任務,工做線程會調用此方法*/
private synchronized Runnable getTask(int threadid) throws InterruptedException {
while(workQueue.size() == 0) {
if(isClosed) return null;
System.out.println("工做線程"+threadid+"等待任務...");
wait(); //若是工做隊列中沒有任務,就等待任務
}
System.out.println("工做線程"+threadid+"開始執行任務...");
return (Runnable) workQueue.removeFirst(); //反回隊列中第一個元素,並從隊列中刪除
}

/** 關閉線程池 */
public synchronized void closePool() {
if(! isClosed) {
waitFinish(); //等待工做線程執行完畢
isClosed = true;
workQueue.clear(); //清空工做隊列
interrupt(); //中斷線程池中的全部的工做線程,此方法繼承自ThreadGroup類
}
}

/** 等待工做線程把全部任務執行完畢*/
public void waitFinish() {
synchronized (this) {
isClosed = true;
notifyAll(); //喚醒全部還在getTask()方法中等待任務的工做線程
}
Thread[] threads = new Thread[activeCount()]; //activeCount() 返回該線程組中活動線程的估計值。
int count = enumerate(threads); //enumerate()方法繼承自ThreadGroup類,根據活動線程的估計值得到線程組中當前全部活動的工做線程
for(int i =0; i < count; i++) { //等待全部工做線程結束
try {
threads[i].join(); //等待工做線程結束
}catch(InterruptedException ex) {
ex.printStackTrace();
}
}
}繼承

/**
* 內部類,工做線程,負責從工做隊列中取出任務,並執行
* @author sunnylocus
*/
private class WorkThread extends Thread {
private int id;
public WorkThread(int id) {
//父類構造方法,將線程加入到當前ThreadPool線程組中
super(ThreadPool.this,id+"");
this.id =id;
}

@Override
public void run() {
while(! isInterrupted()) { //isInterrupted()方法繼承自Thread類,判斷線程是否被中斷
Runnable task = null;
try {
task = getTask(id); //取出任務
}catch(InterruptedException ex) {
ex.printStackTrace();
}
//若是getTask()返回null或者線程執行getTask()時被中斷,則結束此線程
if(task == null) return;

try {
task.run(); //運行任務
}catch(Throwable t) {
t.printStackTrace();
}
}// end while
}// end run
}// end workThread
}隊列

相關文章
相關標籤/搜索