線程通訊概念:線程是操做系統中獨立的個體,但這些個體若是不通過特殊的處理,就不能成爲一個總體,線程之間的通訊就成爲總體的必用方法之一。
使用 wait/notify 方法注意點:
1)wait 和 notify 必需要配合 synchronized 關鍵字使用
2)wait方法是釋放鎖的, notify方法不釋放鎖。java
BlockingQueue:是一個隊列,而且支持阻塞的機制,阻塞的放入和獲得數據。咱們要實現 LinkedBlockingQueue 下面兩個簡單的方法put 和 take
put(an object):把一個object 加到BlockingQueue裏,若是BlockingQueue沒有空間,則調用此方法的線程會被阻斷,直到BlockingQueue裏面有空間再繼續。
take:取走BlockingQueue裏排在首位的對象,若BlockingQueue爲空,阻斷進入等待狀直到BlockingQueue有新數據被加入。數組
import java.util.LinkedList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class MyQueue { //1 須要一個承裝元素的集合 private LinkedList<Object> list = new LinkedList<Object>(); //2 須要一個計數器 AtomicInteger (原子性) private AtomicInteger count = new AtomicInteger(0); //3 須要制定上限和下限 private final int minSize = 0; private final int maxSize ; //4 構造方法 public MyQueue(int size){ this.maxSize = size; } //5 初始化一個對象 用於加鎖 private final Object lock = new Object(); //put(anObject): 把anObject加到BlockingQueue裏,若是BlockQueue沒有空間,則調用此方法的線程被阻斷,直到BlockingQueue裏面有空間再繼續. public void put(Object obj){ synchronized (lock) { while(count.get() == this.maxSize){ try { lock.wait(); // 當Queue沒有空間時,線程被阻塞 } catch (InterruptedException e) { e.printStackTrace(); } } //1 加入元素 list.add(obj); //2 計數器累加 count.incrementAndGet(); //3 新增元素後,通知另一個線程(喚醒),隊列多了一個元素,能夠作移除操做了。 lock.notify(); System.out.println("新加入的元素爲:" + obj); } } //take: 取走BlockingQueue裏排在首位的對象,若BlockingQueue爲空,阻斷進入等待狀態直到BlockingQueue有新的數據被加入. public Object take(){ Object ret = null; synchronized (lock) { while(count.get() == this.minSize){ try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //1 作移除元素操做 ret = list.removeFirst(); //2 計數器遞減 count.decrementAndGet(); //3 移除元素後,喚醒另一個線程,隊列少元素了,能夠再添加操做了 lock.notify(); } return ret; } public int getSize(){ return this.count.get(); } public static void main(String[] args) { final MyQueue mq = new MyQueue(5); mq.put("a"); mq.put("b"); mq.put("c"); mq.put("d"); mq.put("e"); System.out.println("當前容器的長度:" + mq.getSize()); Thread t1 = new Thread(new Runnable() { @Override public void run() { mq.put("f"); mq.put("g"); } },"t1"); Thread t2 = new Thread(new Runnable() { @Override public void run() { Object o1 = mq.take(); System.out.println("移除的元素爲:" + o1); Object o2 = mq.take(); System.out.println("移除的元素爲:" + o2); } },"t2"); t1.start(); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } t2.start(); } }
從打印的信息能夠看出,當t2 線程移除數據後,t1線程纔開始加入數據緩存
Queue的主要實現以下圖所示。
圖片描述安全
Queue主要分兩類,一類是高性能隊列 ConcurrentLinkedQueue; 一類是阻塞隊列 BlockingQueue架構
ConcurrentLinkedQueue:是一個適合高併發場景下的隊列,經過無鎖的方式,實現了高併發狀態下的高性能。一般ConcurrentLinkedQueue性能好於BlockingQueue。它是一個基於連接節點的無界限線程安全隊列。該隊列的元素遵循先進先出的原則。頭是最早加入的,尾是最後加入的。該隊列不容許null元素。
ConcurrentLinkedQueue 重要方法:
add() 和 offer() 都是加入元素,該隊列中無區別
poll() 和 peek() 都是取頭元素節點,區別在於前者會刪除元素,後者不會併發
import java.util.concurrent.ConcurrentLinkedQueue; public class MyConcurrentLinkedQueue { public static void main(String[] args) throws Exception { //高性能無阻塞無界隊列:ConcurrentLinkedQueue ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<String>(); q.offer("a"); q.offer("b"); q.offer("c"); q.offer("d"); q.add("e"); System.out.println(q.poll()); // 打印結果:a (從頭部取出元素,並從隊列裏刪除) System.out.println(q.size()); // 打印結果:4 (執行poll 後 元素減小一個) System.out.println(q.peek()); // 打印結果:b (a 被移除了,首元素就是b) System.out.println(q.size()); // 打印結果:4 (peek 不移除元素) } }
ArrayBlockingQueue: 基於數組的阻塞隊列實現。在內部,維護了一個定長數組,以便緩存隊列中的數據對象。其內部沒有實現讀寫分離,也就意味着生產和消費不能徹底並行。長度是須要定義的,能夠指定先進先出或者先進後出,是一個有界隊列。ide
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; public class MyArrayBlockingQueue { public static void main(String[] args) throws Exception { ArrayBlockingQueue<String> array = new ArrayBlockingQueue<String>(5); // 能夠嘗試 隊列長度由3改到5 array.offer("offer 方法 插入數據成功返回true 不然返回false"); array.offer("3秒後插入數據", 3, TimeUnit.SECONDS); array.put("put 方法 若超出長度就會阻塞等待"); array.add("add 方法 在超出長度時會提示錯誤信息 java.lang.IllegalStateException"); // java.lang.IllegalStateException: Queue full System.out.println(array.offer("true or false", 3, TimeUnit.SECONDS)); System.out.println(array); } }
LinkedBlockingQueue:基於列表的阻塞隊列。同ArrayBlockingQueue相似,其內部維護了一個數據緩衝隊列(該隊列由一個鏈表構成)。它之因此可以高效的處理併發數據,是由於其內部實現採用分離鎖(讀寫分離兩個鎖),從而實現生產者和消費者操做的徹底並行運行。是一個無界隊列
用法和 ArrayBlockingQueue 差很少 。區別在於,LinkedBlockingQueue是無界隊列,初始化的時候,能夠設置一個長度,也能夠不設置。
SynchronousQueue:一種沒有緩衝的隊列,生存者生產的數據直接會被消費者獲取並消費。函數
以上三個隊列,用於什麼場景呢?舉個坐地鐵例子:
在人少的時候,直接刷卡進站,無需等待,這用SynchronousQueue。
上班高峯期,人不少,刷卡的時候須要排隊,這用LinkedBlockingQueue無界隊列。
放假高峯期,人滿人患,這時候就要用有界隊列ArrayBlockingQueue。若是採用LinkedBlockingQueue 無界隊列的話,進來的人太多會影響地鐵站正常工做了,因此人太多就不讓進,等下次。高併發
PriorityBlockingQueue:基於優先級的阻塞隊列(優先級的判斷經過構造函數傳入的Compator對象來決定,也就是說傳入隊列的對象必須實現Comparable接口),在實現PriorityBlockingQueue時,內部控制線程同步的鎖採用的是公平鎖,是一個無界隊列。
傳入隊列的對象:Task工具
public class Task implements Comparable<Task>{ private int id ; private String name; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public int compareTo(Task task) { return this.id > task.id ? 1 : (this.id < task.id ? -1 : 0); } public String toString(){ return this.id + "," + this.name; } }
PriorityBlockingQueue 排序:
import java.util.concurrent.PriorityBlockingQueue; public class MyPriorityBlockingQueue { public static void main(String[] args) throws Exception{ PriorityBlockingQueue<Task> q = new PriorityBlockingQueue<Task>(); // 由大到小的設置 Task t1 = new Task(); t1.setId(1); t1.setName("id爲1"); Task t2 = new Task(); t2.setId(4); t2.setName("id爲4"); Task t3 = new Task(); t3.setId(9); t3.setName("id爲9"); Task t4 = new Task(); t4.setId(16); t4.setName("id爲16"); Task t5 = new Task(); t5.setId(5); t5.setName("id爲5"); // 故意打亂順序進入隊列 q.add(t3); q.add(t4); q.add(t1); q.add(t2); q.add(t5); System.out.println("初始隊列容器:" + q); System.out.println("第一個元素:" + q.take()); // 執行take後排序(取值後排序輸出) System.out.println("執行take方法後容器:" + q); } }
DelayQueue:帶有延遲時間的Queue,其中的元素只有指定的延遲時間到了,纔可以從隊列中獲取到該元素,DelayQueue中的元素必須實現Delayed接口,DelayQueue是一個沒有大小限制的隊列,應用場景不少,好比對緩存超時的數據進行移除,任務超時處理,空閒鏈接的關閉等等
摘錄網上代碼,一個網吧上網的案例:
import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class Wangmin implements Delayed { private String name; //身份證 private String id; //截止時間 private long endTime; //定義時間工具類 private TimeUnit timeUnit = TimeUnit.SECONDS; public Wangmin(String name,String id,long endTime){ this.name=name; this.id=id; this.endTime = endTime; } public String getName(){ return this.name; } public String getId(){ return this.id; } /** * 用來判斷是否到了截止時間 */ @Override public long getDelay(TimeUnit unit) { //return unit.convert(endTime, TimeUnit.MILLISECONDS) - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS); return endTime - System.currentTimeMillis(); } /** * 相互批較排序用 */ @Override public int compareTo(Delayed delayed) { Wangmin w = (Wangmin)delayed; return this.getDelay(this.timeUnit) - w.getDelay(this.timeUnit) > 0 ? 1:0; } }
import java.util.concurrent.DelayQueue; public class WangBa implements Runnable { private DelayQueue<Wangmin> queue = new DelayQueue<Wangmin>(); public boolean yinye =true; public void shangji(String name,String id,int money){ Wangmin man = new Wangmin(name, id, 1000 * money + System.currentTimeMillis()); System.out.println("網名"+man.getName()+" 身份證"+man.getId()+"交錢"+money+"塊,開始上機..."); this.queue.add(man); } public void xiaji(Wangmin man){ System.out.println("網名"+man.getName()+" 身份證"+man.getId()+"時間到下機..."); } @Override public void run() { while(yinye){ try { Wangmin man = queue.take(); xiaji(man); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String args[]){ try{ System.out.println("網吧開始營業"); WangBa siyu = new WangBa(); Thread shangwang = new Thread(siyu); shangwang.start(); siyu.shangji("路人甲", "123", 1); siyu.shangji("路人乙", "234", 10); siyu.shangji("路人丙", "345", 5); } catch(Exception e){ e.printStackTrace(); } } }