RabbitMQ入門:發佈/訂閱(Publish/Subscribe)

在前面的兩篇博客中html

遇到的實例都是一個消息只發送給一個消費者(工做者),他們的消息模型分別爲(P表明生產者,C表明消費者,紅色表明隊列):ide

此次咱們來看下將一個消息發送給多個消費者(工做者),這種模式通常被稱爲「發佈/訂閱」模式。其工做模型爲(P表明生產者,X表明Exchange(路由器/交換機),C表明消費者,紅色表明隊列):ui

咱們發現,工做模型中首次出現路由器,而且每一個消費者有單獨的隊列。生產者生成消息後將其發送給路由器,而後路由器轉送到隊列,消費者各自到本身的隊列裏面獲取消息進行消費。在實際的應用場景中,生產者通常不會直接將消息發送給隊列,而是發送給路由器進行中轉,Exchange必須清楚的知道怎麼處理收到的消息:是將消息發送到一個特定隊列仍是多有隊列,或者直接廢棄消息。這種才符合RabbitMQ消息模型的核心思想this

接下來咱們詳細展開今天的話題:spa

1、Exchange3d

Exchange在咱們的工做模型中首次出現,所以須要詳細介紹下。日誌

Exchange分爲4種類型:code

Direct:徹底根據key進行投遞的,例如,綁定時設置了routing key爲」abc」,那麼客戶端提交的消息,只有設置了key爲」abc」的纔會投遞到隊列。
Topic:對key進行模式匹配後進行投遞,符號」#」匹配一個或多個詞,符號」*」匹配正好一個詞。例如」abc.#」匹配」abc.def.ghi」,」abc.*」只匹配」abc.def」。
Fanout:不須要key,它採起廣播模式,一個消息進來時,投遞到與該交換機綁定的全部隊列。
Headers:咱們能夠不考慮它。

今天咱們的實例採用fanout類型的exchange。htm

儘管首次出現,可是其實咱們前面的案例中也有用到exchange,只是咱們沒有給他名字,用的是RabbitMQ默認的,好比下面這段代碼,咱們將路由器名這個參數傳入了「」,若是咱們須要本身聲明exchange的話,這個就不能傳入「」了,而是傳入本身定義好的值。blog

2、臨時隊列

前面兩篇博客中,咱們都在使用隊列的時候給出了定義好的名字,這在生產者和消費者共用相同隊列的時候頗有必要,可是咱們有了exchange,生產者不須要知道有哪些隊列,所以隊列名字能夠不用指定了,而是經過RabbitMQ 接口本身去生成臨時隊列,隊列名字也由RabbitMQ自動生成。經過

能夠聲明一個非持久的、通道獨佔的、自動刪除的隊列,getQueue()方法能夠獲取隨機隊列名字。這個名字用來在隊列和exchange之間創建binding關係的時候使用:

 

3、代碼實現

基於上面exchange和臨時隊列的知識鋪墊,能夠展開今天的代碼實現了。

  1.  生產者
    public class Product {
        //exchange名字
        public static String EXCHANGE_NAME = "exchange";
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                // 1.建立鏈接和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.爲通道聲明exchange和exchange的類型
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
                
                String msg = " hello rabbitmq, this is publish/subscribe mode";
                // 3.發送消息到指定的exchange,隊列指定爲空,由exchange根據狀況判斷須要發送到哪些隊列
                channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
                System.out.println("product send a msg: " + msg);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                // 4.關閉鏈接
                if (channel != null) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }
    
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

     

  2. 消費者1
    public class Consumer1 {
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                // 1.建立鏈接和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.爲通道聲明exchange以及exchange類型
                channel.exchangeDeclare(Product.EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
    
                // 3.建立隨機名字的隊列
                String queueName = channel.queueDeclare().getQueue();
    
                // 4.創建exchange和隊列的綁定關係
                channel.queueBind(queueName, Product.EXCHANGE_NAME, "");
                System.out.println(" **** Consumer1 keep alive ,waiting for messages, and then deal them");
                // 5.經過回調生成消費者並進行監聽
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                            com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                        // 獲取消息內容而後處理
                        String msg = new String(body, "UTF-8");
                        System.out.println("*********** Consumer1" + " get message :[" + msg + "]");
                    }
                };
                // 6.消費消息
                channel.basicConsume(queueName, true, consumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }

     

  3. 消費者2,核心代碼同消費者1同樣,只是在日誌打印上將"Consumer1"改成"Consumer2"而已。這裏再也不列出具體代碼。
  4. 先運行消費者1和2,而後運行生產者,觀察控制檯log打印狀況:
    生產者:
    product send a msg:  hello rabbitmq, this is publish/subscribe mode
    
    消費者1**** Consumer1 keep alive ,waiting for messages, and then deal them
    *********** Consumer1 get message :[ hello rabbitmq, this is publish/subscribe mode]
    
    消費者2: **** Consumer2 keep alive ,waiting for messages, and then deal them
    *********** Consumer2 get message :[ hello rabbitmq, this is publish/subscribe mode]

    能夠看到,當生產者發出消息後,兩個消費者最終都收到了消息。

  5. 咱們去查看RabbitMQ管理頁面:

    在Exchanges 標籤頁裏面多了一個名爲「exchange」的路由器,他的類型是fanout。點exchange 的link進入詳細頁面:

    發如今binding項目中有了兩條綁定關係,隊列的名字也能夠看到。將頁面切換到Queues標籤頁:

    出現了兩個新的隊列,隊列名字和綁定關係中的同樣,而且隊列都是自動刪除的、通道獨佔的。

  6. 而後將消費者1和消費者2都停掉,從新查看管理頁面,咱們發現exchange還在,binding關係不存在了,臨時隊列也自動刪除了

相關文章
相關標籤/搜索