1、線程池源碼以下java
一、阻塞任務隊列 BlockingQueuenode
public interface BlockingQueue<E> {
boolean offer(E e);
public E take();
}ide
阻塞任務隊列實現類 LinkedBlockingQueue函數
import java.util.concurrent.atomic.AtomicInteger;工具
public class LinkedBlockingQueue<E> implements BlockingQueue<E> {
private final AtomicInteger count = new AtomicInteger();
private final int capacity;
transient Node<E> head;
private transient Node<E> last;
public LinkedBlockingQueue(int capacity) {
this.capacity = capacity;
last = head = new Node<E>(null);
}
//向阻塞隊列的尾部添加一個任務
@Override
public boolean offer(E e) {
//若是已達到隊列的容量,則拒接添加,返回false
if(count.get() == capacity) {
return false;
}
Node<E> node = new Node<E>(e);
last = last.next = node;
count.getAndIncrement();
return true;
}測試
//從阻塞隊列的頭部取出一個任務
@Override
public E take() {
Node<E> h = head.next;
if(h == null) {
return null;
}
E x = h.item;
head = h;
return x;
}
static class Node<E>{
E item;
Node<E> next;
Node(E x){
item = x;
}
}
}this
二、建立線程的工廠 ThreadFactoryatom
public interface ThreadFactory {
Thread newThread(Runnable r);
}線程
建立線程的工廠實現類 MyThreadFactory接口
import java.util.concurrent.atomic.AtomicInteger;
public class MyThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "demo" + threadNumber.getAndIncrement());
return t;
}
}
三、線程池接口 ExecutorService
public interface ExecutorService {
void execute(Runnable command);
}
線程池實現類 ThreadPoolExecutor
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadPoolExecutor implements ExecutorService{
private final HashSet<Worker> workers = new HashSet<Worker>();
private static final AtomicInteger ctl = new AtomicInteger(0);
private final BlockingQueue<Runnable> workQueue;
private volatile ThreadFactory threadFactory;
private volatile int corePoolSize;
private volatile int maximumPoolSize;
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
//若是線程池當前線程數小於核心線程數,則調用addWorker方法增長一個工做線程
if(ctl.get() < corePoolSize) {
if(addWorker(command, true)) {
return;
}
}
//不然嘗試將該任務添加到任務隊列
if(workQueue.offer(command)) {
workQueue.offer(command);
}else{//若是添加該任務到任務隊列失敗則說明任務隊列已滿,新開啓線程執行該任務
//若是新開啓線程失敗
if(!addWorker(command, false)) {
System.out.println("任務" + command + "被線程池拒絕");
}
}
}
private boolean addWorker(Runnable firstTask, boolean core) {
int c = ctl.get();
if(c >= (core ? corePoolSize : maximumPoolSize)) {
return false;
}
ctl.compareAndSet(c, c+1);
Worker w = new Worker(firstTask);
final Thread t = w.thread;
workers.add(w);
t.start();
return true;
}
private final class Worker implements Runnable{
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = threadFactory.newThread(this);
}
@Override
public void run() {
Runnable task = this.firstTask;
while (task != null || (task = workQueue.take()) != null) {
task.run();
task = null;
}
}
}
}
四、建立線程池的工具類 Executors
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, new LinkedBlockingQueue<Runnable>(2), new MyThreadFactory());
}
}
2、測試線程池的功能
public class ThreadTest {
public static void main(String[] args) {
/**
* 一、使用Executors建立線程池
*/
// ExecutorService executors = Executors.newFixedThreadPool(2);
/**
* 二、直接new ThreadPoolExecutor,並指定corePoolSize和maximumPoolSize不同
*/
ExecutorService executors = new ThreadPoolExecutor(2, 4,
new LinkedBlockingQueue<Runnable>(3), new MyThreadFactory());
executors.execute(new Runnable() {
@Override
public void run() {
int i = 0;
while(i++ < 5) {
System.out.println(Thread.currentThread().getName() + "-- 111");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
executors.execute(new Runnable() {
@Override
public void run() {
int i = 0;
while(i++ < 5) {
System.out.println(Thread.currentThread().getName() + "-- 222");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
executors.execute(new Runnable() {
@Override
public void run() {
int i = 0;
while(i++ < 5) {
System.out.println(Thread.currentThread().getName() + "-- 333");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
executors.execute(new Runnable() {
@Override
public void run() {
int i = 0;
while(i++ < 5) {
System.out.println(Thread.currentThread().getName() + "-- 444");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
executors.execute(new Runnable() {
@Override
public void run() {
int i = 0;
while(i++ < 5) {
System.out.println(Thread.currentThread().getName() + "-- 555");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
executors.execute(new Runnable() {
@Override
public void run() {
int i = 0;
while(i++ < 5) {
System.out.println(Thread.currentThread().getName() + "-- 666");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
executors.execute(new Runnable() {
@Override
public void run() {
int i = 0;
while(i++ < 5) {
System.out.println(Thread.currentThread().getName() + "-- 777");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
executors.execute(new Runnable() {
@Override
public void run() {
int i = 0;
while(i++ < 5) {
System.out.println(Thread.currentThread().getName() + "-- 888");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
System.out.println("主函數結束");
}
}
從輸出結果能夠看出,啓動了4個線程,而且有線程執行了多個任務,有任務被拒絕