用過消息隊列?Kafka?可否手寫一個消息隊列?懵

是否有一樣的經歷?面試官問你作過啥項目,我一頓胡侃,項目利用到了消息隊列,kafka,rocketMQ等等。面試

好的,那請開始你的表演,面試官遞過一支筆:給我手寫一個消息隊列!!WHAT?多線程

爲了你們遇到這種場景還能愉快的zhuangbi,因此寫一篇文章,湊合用一下。併發

想要實現一個消息隊列,咱們須要關組如下幾點:ide

1.首先有一個隊列(FIFO)來存放消息函數

2.消息隊列容量有限測試

3.須要入隊,出隊方法this

4.須要考慮多線程併發狀況spa

 

 

<1>.簡單版:用LinkedList實現一個簡單的消息隊列線程

這裏用LinkedList來實現隊列,而後經過synchronized關鍵字來實現多線程的互斥,用LinkedList的addLast方法實現隊列的push,用LinkedList的removeFirst實現隊列的remove方法code

//實現FIFO隊列
public class MyList<T> {
    private LinkedList<T> storage = new LinkedList<T>();
    private int statckSize = 2000;
    public synchronized void push(T e) {//須要加上同步
        storage.addLast(e);
    }

    public synchronized T peek() {
        if(storage!=null&&storage.size()>0){
            return storage.peekFirst();
        }
        return null;

    }

    public void remove() {
        storage.removeFirst();
    }

    public boolean empty() {
        return storage.isEmpty();
    }
}
View Code

測試類:

public class ListTest {
    public static void main(String[] args) {
        MyList<String> myList = new MyList<String>();
        for(String s : "the prefect code".split(" ")){//LIFO
            myList.push(s);
        }
        while(!myList.empty()){
            System.out.print(myList.peek()+" ");
            myList.remove();
        }
    }

}
View Code

 

<2>.進階版,仍然用LinkedList來實現隊列,給出倉庫的概念(消息隊列倉庫),生產者和消費者分別在獨立線程中實現,使用object的wait(),notify()和synchronized()實現線程操做的同步與互斥(Obj.wait(),與Obj.notify()必需要與synchronized(Obj)一塊兒使用,也就是wait,與notify是針對已經獲取了Obj鎖進行操做,從語法角度來講就是Obj.wait(),Obj.notify必須在synchronized(Obj){...}語句塊內。)

抽象倉庫類:

public interface AbstractStorage {
    void consumer(int num);
    void producer(int num);
}
View Code

生產者線程:

class Producer extends Thread {
    //生產數量
    private int num;
    //倉庫
    private AbstractStorage abstractStorage;

    public Producer(AbstractStorage abstractStorage,int num){
        this.abstractStorage=abstractStorage;
        this.num=num;
    }
    // 調用倉庫Storage的生產函數
    public void produce(int num){
        abstractStorage.producer(num);
    }
    // 線程run函數
    @Override
    public void run(){
        produce(num);
    }

}
View Code

消費者線程:

class Consumer extends Thread {
     //消費數量
     private int num;
     //倉庫
     private AbstractStorage abstractStorage;

     public Consumer(AbstractStorage abstractStorage,int num){
        this.abstractStorage=abstractStorage;
        this.num=num;
     }

     public void consume(int num){
         abstractStorage.consumer(num);
     }

     @Override
    public void run(){
         consume(num);
     }

}
View Code

消息隊列(倉庫)實現類:

public class Storage1 implements AbstractStorage {
    //最大容量
    private final int MAX_SIZE = 100;
    //存儲載體
    private LinkedList list =new LinkedList();

    @Override
    public void consumer(int num) {
        synchronized (list) {
            while (num > list.size()) {
                try {
                    list.wait();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("【阻塞】當前要消費的數量:" + num + ",當前庫存量:" + list.size() + "當前消費阻塞");
            }
            for (int i = 0; i < num; i++) {
                list.removeFirst();
            }
            System.out.println("【consumer】 "+Thread.currentThread().getName()+" 已消費產品數:" + num + ",現庫存數:" + list.size());
            list.notifyAll();
        }
    }

    //生產
    @Override
    public void producer(int num) {
        synchronized (list) {
            while (list.size() + num > MAX_SIZE) {
                try {
                    list.wait();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("【阻塞】當前隊列已滿,生產阻塞");
            }
            for (int i = 0; i < num; i++) {
                list.addLast(new Object());
            }
            System.out.println("【producer】 "+Thread.currentThread().getName()+ " 已生產產品數:" + num + ",現庫存數:" + list.size());
            list.notifyAll();
        }
    }


}
View Code

測試類:

public class Test {
    public static void main(String[] args) {
        AbstractStorage abstractStorage =new Storage1();

        //生產者對象
        Producer p1 = new Producer(abstractStorage,10);
        Producer p2 = new Producer(abstractStorage,10);
        Producer p3 = new Producer(abstractStorage,10);
        Producer p4 = new Producer(abstractStorage,10);
        Producer p5 = new Producer(abstractStorage,10);
        Producer p6 = new Producer(abstractStorage,10);
        Producer p7 = new Producer(abstractStorage,10);
        Producer p8 = new Producer(abstractStorage,50);
        //消費者對象
        Consumer c1 = new Consumer(abstractStorage,20);
        Consumer c2 = new Consumer(abstractStorage,30);
        Consumer c3 = new Consumer(abstractStorage,50);

        c1.start();
        c2.start();
        c3.start();

        p1.start();
        p2.start();
        p3.start();
        p4.start();
        p5.start();
        p6.start();
        p7.start();
        p8.start();

    }



}
View Code

最終結果顯示,咱們能實現簡單的生產消費,而且是線程同步的。

相關文章
相關標籤/搜索