是否有一樣的經歷?面試官問你作過啥項目,我一頓胡侃,項目利用到了消息隊列,kafka,rocketMQ等等。面試
好的,那請開始你的表演,面試官遞過一支筆:給我手寫一個消息隊列!!WHAT?多線程
爲了你們遇到這種場景還能愉快的zhuangbi,因此寫一篇文章,湊合用一下。併發
想要實現一個消息隊列,咱們須要關組如下幾點:ide
1.首先有一個隊列(FIFO)來存放消息函數
2.消息隊列容量有限測試
3.須要入隊,出隊方法this
4.須要考慮多線程併發狀況spa
<1>.簡單版:用LinkedList實現一個簡單的消息隊列線程
這裏用LinkedList來實現隊列,而後經過synchronized關鍵字來實現多線程的互斥,用LinkedList的addLast方法實現隊列的push,用LinkedList的removeFirst實現隊列的remove方法code
//實現FIFO隊列 public class MyList<T> { private LinkedList<T> storage = new LinkedList<T>(); private int statckSize = 2000; public synchronized void push(T e) {//須要加上同步 storage.addLast(e); } public synchronized T peek() { if(storage!=null&&storage.size()>0){ return storage.peekFirst(); } return null; } public void remove() { storage.removeFirst(); } public boolean empty() { return storage.isEmpty(); } }
測試類:
public class ListTest { public static void main(String[] args) { MyList<String> myList = new MyList<String>(); for(String s : "the prefect code".split(" ")){//LIFO myList.push(s); } while(!myList.empty()){ System.out.print(myList.peek()+" "); myList.remove(); } } }
<2>.進階版,仍然用LinkedList來實現隊列,給出倉庫的概念(消息隊列倉庫),生產者和消費者分別在獨立線程中實現,使用object的wait(),notify()和synchronized()實現線程操做的同步與互斥(Obj.wait(),與Obj.notify()必需要與synchronized(Obj)一塊兒使用,也就是wait,與notify是針對已經獲取了Obj鎖進行操做,從語法角度來講就是Obj.wait(),Obj.notify必須在synchronized(Obj){...}語句塊內。)
抽象倉庫類:
public interface AbstractStorage { void consumer(int num); void producer(int num); }
生產者線程:
class Producer extends Thread { //生產數量 private int num; //倉庫 private AbstractStorage abstractStorage; public Producer(AbstractStorage abstractStorage,int num){ this.abstractStorage=abstractStorage; this.num=num; } // 調用倉庫Storage的生產函數 public void produce(int num){ abstractStorage.producer(num); } // 線程run函數 @Override public void run(){ produce(num); } }
消費者線程:
class Consumer extends Thread { //消費數量 private int num; //倉庫 private AbstractStorage abstractStorage; public Consumer(AbstractStorage abstractStorage,int num){ this.abstractStorage=abstractStorage; this.num=num; } public void consume(int num){ abstractStorage.consumer(num); } @Override public void run(){ consume(num); } }
消息隊列(倉庫)實現類:
public class Storage1 implements AbstractStorage { //最大容量 private final int MAX_SIZE = 100; //存儲載體 private LinkedList list =new LinkedList(); @Override public void consumer(int num) { synchronized (list) { while (num > list.size()) { try { list.wait(); } catch (Exception e) { e.printStackTrace(); } System.out.println("【阻塞】當前要消費的數量:" + num + ",當前庫存量:" + list.size() + "當前消費阻塞"); } for (int i = 0; i < num; i++) { list.removeFirst(); } System.out.println("【consumer】 "+Thread.currentThread().getName()+" 已消費產品數:" + num + ",現庫存數:" + list.size()); list.notifyAll(); } } //生產 @Override public void producer(int num) { synchronized (list) { while (list.size() + num > MAX_SIZE) { try { list.wait(); } catch (Exception e) { e.printStackTrace(); } System.out.println("【阻塞】當前隊列已滿,生產阻塞"); } for (int i = 0; i < num; i++) { list.addLast(new Object()); } System.out.println("【producer】 "+Thread.currentThread().getName()+ " 已生產產品數:" + num + ",現庫存數:" + list.size()); list.notifyAll(); } } }
測試類:
public class Test { public static void main(String[] args) { AbstractStorage abstractStorage =new Storage1(); //生產者對象 Producer p1 = new Producer(abstractStorage,10); Producer p2 = new Producer(abstractStorage,10); Producer p3 = new Producer(abstractStorage,10); Producer p4 = new Producer(abstractStorage,10); Producer p5 = new Producer(abstractStorage,10); Producer p6 = new Producer(abstractStorage,10); Producer p7 = new Producer(abstractStorage,10); Producer p8 = new Producer(abstractStorage,50); //消費者對象 Consumer c1 = new Consumer(abstractStorage,20); Consumer c2 = new Consumer(abstractStorage,30); Consumer c3 = new Consumer(abstractStorage,50); c1.start(); c2.start(); c3.start(); p1.start(); p2.start(); p3.start(); p4.start(); p5.start(); p6.start(); p7.start(); p8.start(); } }
最終結果顯示,咱們能實現簡單的生產消費,而且是線程同步的。