PriorityBlockingQueue 和 Executors.newCachedThreadPool()


一、PriorityBlockingQueue裏面存儲的對象必須是實現Comparable接口。java

二、隊列經過這個接口的compare方法肯定對象的優先級priority。 緩存

規則是:當前和其餘對象比較,若是compare方法返回負數,那麼在隊列裏面的優先級就比較高dom

下面的測試能夠說明這個斷言: ide

查看打印結果,比較take出來的Entity和left的entity,比較他們的priority測試

public class TestPriorityQueue { 

static Random r=new Random(47); 

public static void main(String args[]){ 

final PriorityBlockingQueue q = new PriorityBlockingQueue(); 

ExecutorService se = Executors.newCachedThreadPool(); 

final int qTime = r.nextInt(100);
//execute producer se.execute(new Runnable(){ public void run() { int i=0; while(true){ q.put(new PriorityEntity(r.nextInt(10), i++)); //優先級,索引 try { TimeUnit.MILLISECONDS.sleep(qTime); } catch (InterruptedException e) { e.printStackTrace(); } } } }); //execute consumer se.execute(new Runnable(){ public void run() { while(true){ try {
int rTime = r.nextInt(500);

//打印進隊和出隊的時間能夠看到隊列一直在累積 System.out.println(qTime + ": " rTime +
" --take-- "+q.take()+" left: "+q.size()+" -- ["+q.toString()+"]"); try { TimeUnit.MILLISECONDS.sleep(rTime); } catch (InterruptedException e) { e.printStackTrace(); } } catch (InterruptedException e) { e.printStackTrace(); } } } }); //try { // TimeUnit.SECONDS.sleep(5); //} catch (InterruptedException e) { // e.printStackTrace(); //} System.out.println("shutdown"); } } class PriorityEntity implements Comparable<PriorityEntity> { private int priority; private int index=0; public PriorityEntity(int _priority,int _index) { this.priority = _priority; this.index=_index; } public String toString(){ return "# [index="+index+" priority="+priority+"]"; } //數字小,優先級高 public int compareTo(PriorityEntity o) { return this.priority > o.priority ? 1 : this.priority < o.priority ? -1 : 0; } //數字大,優先級高 // public int compareTo(PriorityTask o) { // return this.priority < o.priority ? 1 : this.priority > o.priority ? -1 : 0; // } }

 

 

理解 newCachedThreadPool()this

一、建立線程池的根源:
spa

public ThreadPoolExecutor(int corePoolSize, 
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler)

 

二、使用場景:
2.一、耗時較短的任務。
2.二、任務處理速度 > 任務提交速度 ,這樣才能保證不會不斷建立新的進程,避免內存被佔滿。newCachedThreadPool能夠當作newFixedThreadPool(無窮大),雖然沒法限制線程總數,可是能夠減小沒必要要的線程建立和銷燬上的消耗,線程

三、取名爲cached-threadpool的緣由在於線程池中的線程是被線程池緩存了的,也就是說,線程沒有任務要執行時,便處於空閒狀態,處於空閒狀態的線程並不會被當即銷燬(會被緩存住),只有當空閒時間超出一段時間(默認爲60s)後,線程池纔會銷燬該線程(至關於清除過期的緩存)。新任務到達後,線程池首先會讓被緩存住的線程(空閒狀態)去執行任務,若是沒有可用線程(無空閒線程),便會建立新的線程。code

 

四、對象

看一段測試代碼:

 

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class newCachedThreadPoolTest {
public static void main(String[] args) {   ExecutorService executorService = Executors.newCachedThreadPool();   for (int i = 1; i < 10000; i++)   executorService.submit(new task());   }   }   class task implements Runnable {   @Override   public void run() {   try {   Thread.sleep(5000);   } catch (InterruptedException e) {   e.printStackTrace();     }   } }

 

運行結果爲:Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread


能夠看出來是堆外內存溢出,由於咱們新建的線程都在工做(代碼中用sleep表示在工做中),newCachedThreadPool 只會重用空閒而且可用的線程,因此上述代碼只能不停地建立新線程,在 64-bit JDK 1.7 中 -Xss 默認是 1024k,也就是 1M,那就是須要 10000*1M = 10G 的堆外內存空間來給線程使用,可是個人機器總共就 8G 內存,不夠建立新的線程,因此就 OOM 了。

 

因此這個newCachedThreadPool 你們通常不用就是這樣的緣由,由於它的最大值是在初始化的時候設置爲Integer.MAX_VALUE,通常來講機器都沒那麼大內存給它不斷使用。固然知道可能出問題的點,就能夠去重寫一個方法限制一下這個最大值,可是出於後期維護緣由,通常來講用 newFixedThreadPool 也就足夠了。

 

Java如何依據cpu核數設置合適的線程數

newFixedThreadPool 定長線程池的大小最好根據系統資源進行設置。如Runtime.getRuntime().availableProcessors()獲取cpu核心數。

executorservice pool = executors.newfixedthreadpool(runtime.getruntime().availableprocessors()*10);通常建議每cpu不超過25個線程,固然多一點也沒什麼問題,不過就費點線程資源.

java.lang.Runtime.availableProcessors() 方法返回到Java虛擬機的可用的處理器數量。此值可能會改變在一個特定的虛擬機調用。應用程序可用處理器的數量是敏感的,所以偶爾查詢該屬性,並適當地調整本身的資源使用狀況.

相關文章
相關標籤/搜索