題記——html
難過了,悄悄走一走;java
傷心了,默默睡一覺;多線程
優雅不是訓練出來的,而是一種閱歷;併發
淡然不是假裝出來的,而是一種沉澱;less
時間飛逝,老去的只是咱們的容顏;異步
時間彷彿一顆靈魂,愈來愈動人;ide
其餘站點:性能
一、回享每一時刻 http://jingyan.baidu.com/article/25648fc193fcbe9190fd004f.htmlthis
二、回眸每一點鐘 http://blog.csdn.net/zl18603543572/article/details/52012122spa
在多線程的世界中,是那麼的神奇 與 高效以及合理;
官方推薦使用Executors類工廠方法來建立線程池管理,Executors類是官方提供的一個工廠類,裏面封裝了好多功能不同的線程池,從而使得咱們建立線程池很是的簡單:
能夠看到1 - 3 建立線程池的方法中,所有是建立了ThreadPoolExecutor這個對象實例,不一樣的只是構造中的參數不一至,而在4 與5 ,從其繼承的角度來看
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
能夠看到其實質也是繼承於ThreadPoolExecutor這個對象實例。
也就是說上述一種類型的線程池其都是 ThreadPoolExecutor子類,其實直接建立ThreadPoolExecutor實例對象,只須要傳入相對應的配製參數,就能夠建立出來與上述五種效果相一至的線程池管理,只不過是在書寫的時候太過於繁鎖。
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters and default thread factory and rejected execution handler. * It may be more convenient to use one of the {@link Executors} factory * methods instead of this general purpose constructor. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
從上述建立ThreadPoolEecutor實例的構造來講,
/** * 建立線程池也是須要資源的,因此線程池內線程數量的大小也會影響系統的性能, * 大了反而浪費資源,小了反而影響系統的吞吐量, * 因此咱們建立線程池須要把握一個度才能合理的發揮它的優勢, * 一般來講咱們要考慮的因素有CPU的數量、內存的大小、併發請求的數量等因素,按需調整。 *一般核心線程數能夠設爲CPU數量+1,而最大線程數能夠設爲CPU的數量*2+1。 */ private void customThreadFunction() { /** * 獲取CPU數量 */ int processors = Runtime.getRuntime().availableProcessors(); /** * 核心線程數量 */ int corePoolSize =processors + 1; /** * 最大線程數量 */ int maximumPoolSize = processors * 2 + 1; /** * 空閒有效時間 */ long keepAliveTime = 60; /** * 建立自定義線程池 */ ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new PriorityBlockingQueue()); /** * 添加執行任務 */ for (int i=1;i<=20;i++){ final int prites = i; threadPoolExecutor.execute(new CustomRunnable(prites){ @Override public void doRun() { String name = Thread.currentThread().getName(); System.out.println("curentThread name is "+name +"and prites is "+prites); SystemClock.sleep(1000); } }); } }
public abstract class CustomRunnable implements Runnable,Comparable<CustomRunnable> { private int priority; public CustomRunnable(int priority) { if (priority<0) throw new IllegalArgumentException(); this.priority = priority; } @Override public int compareTo(CustomRunnable another) { int my = this.getPriority(); int other = another.getPriority(); if (my>other){ return -1; }else{ return 0; } } @Override public void run() { doRun(); } public abstract void doRun(); public int getPriority() { return priority; } }
public class CustomExpanThreadPool extends ThreadPoolExecutor { private CustomExpanThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public static CustomExpanThreadPool getInstance() { /** * 獲取CPU數量 */ int processors = Runtime.getRuntime().availableProcessors(); /** * 核心線程數量 */ int corePoolSize = processors + 1; /** * 最大線程數量 */ int maximumPoolSize = processors * 2 + 1; /** * 空閒有效時間 */ long keepAliveTime = 60; /** * 建立自定義線程池 */ return new CustomExpanThreadPool(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new PriorityBlockingQueue()); } /** * 用於控制線程開始與中止執行的方法 */ private boolean isPaused; private ReentrantLock pauseLock = new ReentrantLock(); private Condition unpaused = pauseLock.newCondition(); /** * 任務執行前要執行的方法 * * @param t * @param r */ @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); System.out.println(Thread.currentThread().getName() + " 任務執行開始 "); pauseLock.lock(); try { while (isPaused) unpaused.await(); } catch (InterruptedException ie) { t.interrupt(); } finally { } } /** * 任務執行後要執行的方法 * * @param r * @param t */ @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); System.out.println(Thread.currentThread().getName() + " 任務執行over "); } /** * 線程池關閉後要執行的方法 */ @Override protected void terminated() { super.terminated(); } /** * 暫停執行任務的方法 */ public void pause() { pauseLock.lock(); try { isPaused = true; } finally { pauseLock.unlock(); } } /** * 恢復執行任務的方法 */ public void resume() { pauseLock.lock(); try { isPaused = false; unpaused.signalAll(); } finally { pauseLock.unlock(); } } }
使用:
private void customThreadFunction2() { CustomExpanThreadPool threadPoolExecutor = CustomExpanThreadPool.getInstance(); /** * 添加執行任務 */ for (int i=1;i<=20;i++){ final int prites = i; threadPoolExecutor.execute(new CustomRunnable(prites){ @Override public void doRun() { String name = Thread.currentThread().getName(); System.out.println("curentThread name is "+name +"and prites is "+prites); SystemClock.sleep(1000); } }); } }
首先建立自定義的線程池(與標籤7)
/** * CPU數量 */ private int process = Runtime.getRuntime().availableProcessors(); /** * 核心線程數量 */ private int corePoolSize = process + 1; /** * 最大線程數量 */ private int maxPoolSize = process * 2 + 1; /** * 空閒有效時間 */ private long keepAliveTime = 60; /** * 自定義線程池 */ private ThreadPoolExecutor mThreadPoolExecutor; /** * 用於保存任務的集合 */ private Map<String, Future<Integer>> mThreadFutureTaskMap; private ThreadPoolManager(){ if (mThreadPoolExecutor == null) { synchronized (ThreadPoolManager.class){ if (mThreadPoolExecutor == null) { init(); } } } } public void init() { /** * 初始化建立線程池 */ mThreadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) { private boolean isPaused; private ReentrantLock pauseLock = new ReentrantLock(); private Condition unpaused = pauseLock.newCondition(); @Override protected void beforeExecute(Thread t, Runnable r) { pauseLock.lock(); try { while (isPaused) unpaused.await(); } catch (InterruptedException ie) { t.interrupt(); } finally { pauseLock.unlock(); } } /** * 暫停 */ public void pauseThreadPool() { pauseLock.lock(); try { isPaused = true; } finally { pauseLock.unlock(); } } /** * 恢復任務 */ public void resumeThreadPool() { pauseLock.lock(); try { isPaused = false; unpaused.signalAll(); } finally { pauseLock.unlock(); } } }; /** * 初始化保存任務的集合 */ mThreadFutureTaskMap = new HashMap<>(); }
而後就是建立用於控制任務的回調
public abstract class ThreadPoolCallable implements Callable<Integer> { @Override public Integer call() throws Exception { runCall(); return 1; } public abstract void runCall(); }
關於ThreadPoolCallable
能夠看到這裏是繼承於Callable,call方法中的回調返回的信息能夠經過些任務對象回調獲得,固然異步方法也是在call方法中執行
而後就是向線程池內添加任務
/** * 添加單個任務 *@param keyTag 任務的標籤 * @param task 任務 * @return 任務對象 */ public Future addTask(ProrityPoolCallable task, String keyTag) { //添加並提交任務 RunnableFuture futureTask = (RunnableFuture) mThreadPoolExecutor.submit(task); //將任務添加到集合中 Future put = mThreadFutureTaskMap.put(keyTag, futureTask); return futureTask; }
將每一個任務提交給線程池管理,而後會返回一個Future對象,看其源碼會發現其最終繼承Runable對象,而後呢咱們將這個Future對象以key,value形式存儲到Map集合中去,目的是當咱們要取消一個任務的時候,當這個任務已經被提交到線程池中,未執行或者是正在執行,咱們均可以經過傳的KEY,而後再從集合中取出Key對應的Futureccf對象,而後執行cancel方法,就能夠取消這個任務了
取消任務
/** * 移除單個任務 * @param key 任務標籤 * @return true 爲成功 */ public boolean removeTask(String key) { //任務移除標識 boolean cancel = false; //根據KEY來獲取對應的Future對象 Set<String> keySet = mThreadFutureTaskMap.keySet(); if (keySet.contains(key)) { Future<Integer> future = mThreadFutureTaskMap.get(key); //當前的任務已經被執行完畢,不進行操做 if (!future.isCancelled()) { cancel = future.cancel(true); if (cancel) { //當取消成功後,從本地Key集合中移除標識 mThreadFutureTaskMap.remove(key); } }else{ mThreadFutureTaskMap.remove(key); } } return cancel; }