本身動手寫線程池

1、線程池源碼以下java

一、阻塞任務隊列 BlockingQueuenode

public interface BlockingQueue<E> {
  boolean offer(E e);
  public E take();
}ide

阻塞任務隊列實現類  LinkedBlockingQueue函數

import java.util.concurrent.atomic.AtomicInteger;工具

public class LinkedBlockingQueue<E> implements BlockingQueue<E> {
  private final AtomicInteger count = new AtomicInteger();
  private final int capacity;

  transient Node<E> head;
  private transient Node<E> last;

  public LinkedBlockingQueue(int capacity) {
    this.capacity = capacity;
    last = head = new Node<E>(null);
  }
  //向阻塞隊列的尾部添加一個任務
  @Override
  public boolean offer(E e) {
    //若是已達到隊列的容量,則拒接添加,返回false
    if(count.get() == capacity) {
      return false;
    }
    Node<E> node = new Node<E>(e);
    last = last.next = node;
    count.getAndIncrement();
    return true;
  }測試

  //從阻塞隊列的頭部取出一個任務
  @Override
  public E take() {
    Node<E> h = head.next;
    if(h == null) {
      return null;
    }
    E x = h.item;
    head = h;
    return x;
  }

  static class Node<E>{
    E item;
    Node<E> next;

    Node(E x){
      item = x;
    }
  }
}this

二、建立線程的工廠  ThreadFactoryatom

public interface ThreadFactory {
  Thread newThread(Runnable r);
}線程

建立線程的工廠實現類    MyThreadFactory接口

import java.util.concurrent.atomic.AtomicInteger;

public class MyThreadFactory implements ThreadFactory {
  private final AtomicInteger threadNumber = new AtomicInteger(1);

  @Override
  public Thread newThread(Runnable r) {
    Thread t = new Thread(r, "demo" + threadNumber.getAndIncrement());
    return t;
  }
}

三、線程池接口   ExecutorService

public interface ExecutorService {
  void execute(Runnable command);
}

線程池實現類    ThreadPoolExecutor

import java.util.HashSet;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadPoolExecutor implements ExecutorService{
  private final HashSet<Worker> workers = new HashSet<Worker>();
  private static final AtomicInteger ctl = new AtomicInteger(0);

  private final BlockingQueue<Runnable> workQueue;
  private volatile ThreadFactory threadFactory;
  private volatile int corePoolSize;
  private volatile int maximumPoolSize;

  public ThreadPoolExecutor(int corePoolSize,
               int maximumPoolSize,
               BlockingQueue<Runnable> workQueue,
              ThreadFactory threadFactory) {
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.threadFactory = threadFactory;
  }

  @Override
  public void execute(Runnable command) {
    //若是線程池當前線程數小於核心線程數,則調用addWorker方法增長一個工做線程
    if(ctl.get() < corePoolSize) {
      if(addWorker(command, true)) {
        return;
      }
    }
    //不然嘗試將該任務添加到任務隊列
    if(workQueue.offer(command)) {
      workQueue.offer(command);
    }else{//若是添加該任務到任務隊列失敗則說明任務隊列已滿,新開啓線程執行該任務
      //若是新開啓線程失敗
      if(!addWorker(command, false)) {
        System.out.println("任務" + command + "被線程池拒絕");
      }
    }
  }

  private boolean addWorker(Runnable firstTask, boolean core) {
    int c = ctl.get();
    if(c >= (core ? corePoolSize : maximumPoolSize)) {
      return false;
    }
    ctl.compareAndSet(c, c+1);

    Worker w = new Worker(firstTask);
    final Thread t = w.thread;
    workers.add(w);
    t.start();

    return true;
  }

  private final class Worker implements Runnable{
    final Thread thread;
    Runnable firstTask;

    Worker(Runnable firstTask) {
      this.firstTask = firstTask;
      this.thread = threadFactory.newThread(this);
    }

    @Override
    public void run() {
      Runnable task = this.firstTask;
      while (task != null || (task = workQueue.take()) != null) {
        task.run();
        task = null;
      }
    }
  }
}

四、建立線程池的工具類   Executors

public class Executors {
  public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, new LinkedBlockingQueue<Runnable>(2), new MyThreadFactory());
  }
}

 

2、測試線程池的功能

public class ThreadTest {
  public static void main(String[] args) {
    /**
    * 一、使用Executors建立線程池
    */
    // ExecutorService executors = Executors.newFixedThreadPool(2);
    /**
    * 二、直接new ThreadPoolExecutor,並指定corePoolSize和maximumPoolSize不同
    */
    ExecutorService executors = new ThreadPoolExecutor(2, 4,
          new LinkedBlockingQueue<Runnable>(3), new MyThreadFactory());

    executors.execute(new Runnable() {
      @Override
      public void run() {
        int i = 0;
        while(i++ < 5) {
          System.out.println(Thread.currentThread().getName() + "-- 111");
          try {
            Thread.sleep(5000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    });
    executors.execute(new Runnable() {
      @Override
      public void run() {
        int i = 0;
        while(i++ < 5) {
          System.out.println(Thread.currentThread().getName() + "-- 222");
          try {
            Thread.sleep(5000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    });
    executors.execute(new Runnable() {
      @Override
      public void run() {
        int i = 0;
        while(i++ < 5) {
          System.out.println(Thread.currentThread().getName() + "-- 333");
          try {
            Thread.sleep(5000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    });
    executors.execute(new Runnable() {
      @Override
      public void run() {
        int i = 0;
        while(i++ < 5) {
          System.out.println(Thread.currentThread().getName() + "-- 444");
          try {
            Thread.sleep(5000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    });
    executors.execute(new Runnable() {
      @Override
      public void run() {
        int i = 0;
        while(i++ < 5) {
          System.out.println(Thread.currentThread().getName() + "-- 555");
          try {
            Thread.sleep(5000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    });
    executors.execute(new Runnable() {
      @Override
      public void run() {
        int i = 0;
        while(i++ < 5) {
          System.out.println(Thread.currentThread().getName() + "-- 666");
          try {
            Thread.sleep(5000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    });
    executors.execute(new Runnable() {
      @Override
      public void run() {
        int i = 0;
        while(i++ < 5) {
          System.out.println(Thread.currentThread().getName() + "-- 777");
          try {
            Thread.sleep(5000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    });
    executors.execute(new Runnable() {
      @Override
      public void run() {
        int i = 0;
        while(i++ < 5) {
          System.out.println(Thread.currentThread().getName() + "-- 888");
          try {
            Thread.sleep(5000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    });

    System.out.println("主函數結束");
  }
}

 

從輸出結果能夠看出,啓動了4個線程,而且有線程執行了多個任務,有任務被拒絕

相關文章
相關標籤/搜索