1 package java.util.concurrent; 2 3 /** 4 * 帶有緩存的線程池 5 */ 6 public static ExecutorService newCachedThreadPool() { 7 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 8 60L, TimeUnit.SECONDS, 9 new SynchronousQueue<Runnable>()); 10 }
1 // 容量 2 private final int capacity; 3 4 // 當前數量 5 private final AtomicInteger count = new AtomicInteger(0); 6 7 // 鏈表的表頭 8 transient Node<E> head; 9 10 // 鏈表的表尾 11 private transient Node<E> last; 12 13 // 用於控制刪除元素的【取出鎖】和鎖對應的【非空條件】 14 private final ReentrantLock takeLock = new ReentrantLock(); 15 private final Condition notEmpty = takeLock.newCondition(); 16 17 // 用於控制添加元素的【插入鎖】和鎖對應的【非滿條件】 18 private final ReentrantLock putLock = new ReentrantLock(); 19 private final Condition notFull = putLock.newCondition();
1 // 建立一個容量爲 Integer.MAX_VALUE 的 LinkedBlockingQueue 2 LinkedBlockingQueue() 3 4 // 建立一個容量是 Integer.MAX_VALUE 的 LinkedBlockingQueue,最初包含給定 collection 的元素,元素按該 collection 迭代器的遍歷順序添加 5 LinkedBlockingQueue(Collection<? extends E> c) 6 7 // 建立一個具備給定(固定)容量的 LinkedBlockingQueue 8 LinkedBlockingQueue(int capacity) 9 10 // 從隊列完全移除全部元素 11 void clear() 12 13 // 將指定元素插入到此隊列的尾部(若是當即可行且不會超出此隊列的容量),在成功時返回 true,若是此隊列已滿,則返回 false 14 boolean offer(E e) 15 16 // 將指定元素插入到此隊列的尾部,若有必要,則等待指定的時間以使空間變得可用 17 boolean offer(E e, long timeout, TimeUnit unit) 18 19 // 獲取但不移除此隊列的頭;若是此隊列爲空,則返回 null 20 E peek() 21 22 // 獲取並移除此隊列的頭,若是此隊列爲空,則返回 null 23 E poll() 24 25 // 獲取並移除此隊列的頭部,在指定的等待時間前等待可用的元素(若是有必要) 26 E poll(long timeout, TimeUnit unit) 27 28 // 將指定元素插入到此隊列的尾部,若有隊列滿,則等待空間變得可用 29 void put(E e) 30 31 // 返回理想狀況下(沒有內存和資源約束)此隊列可接受而且不會被阻塞的附加元素數量 32 int remainingCapacity() 33 34 // 今後隊列移除指定元素的單個實例(若是存在) 35 boolean remove(Object o) 36 37 // 返回隊列中的元素個數 38 int size() 39 40 // 獲取並移除此隊列的頭部,在元素變得可用以前一直等待(若是有必要) 41 E take()
1 /** 2 * 將指定元素插入到此隊列的尾部(若是當即可行且不會超出此隊列的容量) 3 * 在成功時返回 true,若是此隊列已滿,則返回 false 4 * 若是使用了有容量限制的隊列,推薦使用add方法,add方法在失敗的時候只是拋出異常 5 */ 6 public boolean offer(E e) { 7 if (e == null) throw new NullPointerException(); 8 final AtomicInteger count = this.count; 9 if (count.get() == capacity) 10 // 若是隊列已滿,則返回false,表示插入失敗 11 return false; 12 int c = -1; 13 Node<E> node = new Node<E>(e); 14 final ReentrantLock putLock = this.putLock; 15 // 獲取 putLock 16 putLock.lock(); 17 try { 18 // 再次對【隊列是否是滿】的進行判斷,若是不是滿的,則插入節點 19 if (count.get() < capacity) { 20 enqueue(node); // 在隊尾插入節點 21 c = count.getAndIncrement(); // 當前節點數量+1,並返回插入以前節點數量 22 if (c + 1 < capacity) 23 // 若是在插入元素以後,隊列仍然未滿,則喚醒notFull上的等待線程 24 notFull.signal(); 25 } 26 } finally { 27 // 釋放 putLock 28 putLock.unlock(); 29 } 30 if (c == 0) 31 // 若是在插入節點前,隊列爲空,那麼插入節點後,喚醒notEmpty上的等待線程 32 signalNotEmpty(); 33 return c >= 0; 34 }
下面來看看 put(E e) 的源碼:php
1 /** 2 * 將指定元素插入到此隊列的尾部,若有隊列滿,則等待空間變得可用 3 * 4 * @throws InterruptedException {@inheritDoc} 5 * @throws NullPointerException {@inheritDoc} 6 */ 7 public void put(E e) throws InterruptedException { 8 if (e == null) throw new NullPointerException(); 9 10 int c = -1; 11 Node<E> node = new Node<E>(e); 12 final ReentrantLock putLock = this.putLock; 13 final AtomicInteger count = this.count; 14 putLock.lockInterruptibly(); // 可中斷地獲取 putLock 15 try { 16 // count 變量是被 putLock 和 takeLock 保護起來的,因此能夠真實反映隊列當前的容量狀況 17 while (count.get() == capacity) { 18 notFull.await(); 19 } 20 enqueue(node); // 在隊尾插入節點 21 c = count.getAndIncrement(); // 當前節點數量+1,並返回插入以前節點數量 22 if (c + 1 < capacity) 23 // 若是在插入元素以後,隊列仍然未滿,則喚醒notFull上的等待線程 24 notFull.signal(); 25 } finally { 26 putLock.unlock(); // 釋放 putLock 27 } 28 if (c == 0) 29 // 若是在插入節點前,隊列爲空,那麼插入節點後,喚醒notEmpty上的等待線程 30 signalNotEmpty(); 31 }
1 /** 2 * 通知一個等待的take。該方法應該僅僅從put/offer調用,不然通常很難鎖住takeLock 3 */ 4 private void signalNotEmpty() { 5 final ReentrantLock takeLock = this.takeLock; 6 takeLock.lock(); // 獲取 takeLock 7 try { 8 notEmpty.signal(); // 喚醒notEmpty上的等待線程,意味着如今能夠獲取元素了 9 } finally { 10 takeLock.unlock(); // 釋放 takeLock 11 } 12 }
1 /** 2 * 獲取並移除此隊列的頭,若是此隊列爲空,則返回 null 3 */ 4 public E poll() { 5 final AtomicInteger count = this.count; 6 if (count.get() == 0) 7 return null; 8 E x = null; 9 int c = -1; 10 final ReentrantLock takeLock = this.takeLock; 11 takeLock.lock(); // 獲取 takeLock 12 try { 13 if (count.get() > 0) { 14 x = dequeue(); // 獲取隊頭元素,並移除 15 c = count.getAndDecrement(); // 當前節點數量-1,並返回移除以前節點數量 16 if (c > 1) 17 // 若是在移除元素以後,隊列中仍然有元素,則喚醒notEmpty上的等待線程 18 notEmpty.signal(); 19 } 20 } finally { 21 takeLock.unlock(); // 釋放 takeLock 22 } 23 if (c == capacity) 24 // 若是在移除節點前,隊列是滿的,那麼移除節點後,喚醒notFull上的等待線程 25 signalNotFull(); 26 return x; 27 }
1 /** 2 * 取出並返回隊列的頭。若隊列爲空,則一直等待 3 */ 4 public E take() throws InterruptedException { 5 E x; 6 int c = -1; 7 final AtomicInteger count = this.count; 8 final ReentrantLock takeLock = this.takeLock; 9 // 獲取 takeLock,若當前線程是中斷狀態,則拋出InterruptedException異常 10 takeLock.lockInterruptibly(); 11 try { 12 // 若隊列爲空,則一直等待 13 while (count.get() == 0) { 14 notEmpty.await(); 15 } 16 x = dequeue(); // 從隊頭取出元素 17 c = count.getAndDecrement(); // 取出元素以後,節點數量-1;並返回移除以前的節點數量 18 if (c > 1) 19 // 若是在移除元素以後,隊列中仍然有元素,則喚醒notEmpty上的等待線程 20 notEmpty.signal(); 21 } finally { 22 takeLock.unlock(); // 釋放 takeLock 23 } 24 25 if (c == capacity) 26 // 若是在取出元素以前,隊列是滿的,就在取出元素以後,喚醒notFull上的等待線程 27 signalNotFull(); 28 return x; 29 }
1 /** 2 * 喚醒notFull上的等待線程,只能從 poll 或 take 調用 3 */ 4 private void signalNotFull() { 5 final ReentrantLock putLock = this.putLock; 6 putLock.lock(); // putLock 上鎖 7 try { 8 notFull.signal(); // 喚醒notFull上的等待線程,意味着能夠插入元素了 9 } finally { 10 putLock.unlock(); // putLock 解鎖 11 } 12 }
1 package cn.com.gkmeteor.threadpool.utils; 2 3 @Component 4 public class ThreadPoolUtil implements InitializingBean { 5 6 public static int POOL_SIZE = 10; 7 8 @Autowired 9 private ThreadExecutorService threadExecutorService; // 具體的線程處理類 10 11 private List<ThreadWithQueue> threadpool = new ArrayList<>(); 12 13 /** 14 * 在全部基礎屬性初始化完成後,初始化當前類 15 * 16 * @throws Exception 17 */ 18 @Override 19 public void afterPropertiesSet() throws Exception { 20 for (int i = 0; i < POOL_SIZE; i++) { 21 ThreadWithQueue threadWithQueue = new ThreadWithQueue(i, threadExecutorService); 22 this.threadpool.add(threadWithQueue); 23 } 24 } 25 }
1 public static int POOL_SIZE = 10; // 線程池容量 2 index = (++index) % POOL_SIZE; // index 是當前選中的線程下標
1 package cn.com.gkmeteor.threadpool.utils; 2 3 import cn.com.gkmeteor.threadpool.service.ThreadExecutorService; 4 import org.slf4j.Logger; 5 import org.slf4j.LoggerFactory; 6 7 import java.util.concurrent.BlockingQueue; 8 9 /** 10 * 帶有【參數阻塞隊列】的線程 11 */ 12 public class ThreadWithQueue extends Thread { 13 14 public static int CAPACITY = 10; 15 16 private Logger logger = LoggerFactory.getLogger(ThreadWithQueue.class); 17 18 private BlockingQueue<String> queue; 19 20 private ThreadExecutorService threadExecutorService; // 線程運行後的業務邏輯處理 21 22 private String threadName; 23 24 public String getThreadName() { 25 return threadName; 26 } 27 28 public void setThreadName(String threadName) { 29 this.threadName = threadName; 30 } 31 32 /** 33 * 構造方法 34 * 35 * @param i 第幾個線程 36 * @param threadExecutorService 線程運行後的業務邏輯處理 37 */ 38 public ThreadWithQueue(int i, ThreadExecutorService threadExecutorService) { 39 queue = new java.util.concurrent.LinkedBlockingQueue<>(CAPACITY); 40 threadName = "Thread(" + i + ")"; 41 42 this.threadExecutorService = threadExecutorService; 43 44 this.start(); 45 } 46 47 /** 48 * 將參數放到線程的參數隊列中 49 * 50 * @param param 參數 51 * @return 52 */ 53 public String paramAdded(String param) { 54 String result = ""; 55 if(queue.offer(param)) { 56 logger.info("參數已入隊,{} 目前參數個數 {}", this.getThreadName(), queue.size()); 57 result = "參數已加入線程池,等待處理"; 58 } else { 59 logger.info("隊列已達最大容量,請稍後重試"); 60 result = "線程池已滿,請稍後重試"; 61 } 62 return result; 63 } 64 65 public synchronized int getQueueSize() { 66 return queue.size(); 67 } 68 69 @Override 70 public void run() { 71 while (true) { 72 try { 73 String param = queue.take(); 74 logger.info("{} 開始運行,參數隊列中還有 {} 個在等待", this.getThreadName(), this.getQueueSize()); 75 if (param.startsWith("contact")) { 76 threadExecutorService.doContact(param); 77 } else if (param.startsWith("user")) { 78 threadExecutorService.doUser(param); 79 } else { 80 logger.info("參數無效,不作處理"); 81 } 82 logger.info("{} 本次處理完成", this.getThreadName()); 83 } catch (Exception e) { 84 e.printStackTrace(); 85 } 86 } 87 } 88 }
瞭解了連接阻塞隊列的底層方法後,使用起來就底氣十足。具體來講:java