RabbitMQ - Start Up

  • 開始以前

  rabbitmq是一個被普遍使用的消息隊列,它是由erlang編寫的,根據AMQP協議設計實現的。html

  AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。
  RabbitMQ是一個開源的AMQP實現,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。java

  下載rabbitMq數據庫

  http://www.rabbitmq.com/download.htmlapi

  erlang環境安全

  http://www.erlang.org/服務器

  在啓動rabbitMq以前須要安裝erlang環境,而且erlang環境和rabbitMq版本要匹配異步

 

  • AMQP

  ​AMQP,即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。基於此協議的客戶端與消息中間件可傳遞消息,並不受客戶端/中間件同產品,不一樣的開發語言等條件的限制。socket

  AMQ Model分佈式

   

  Message Queue

​   消息隊列會將消息存儲到內存或者磁盤中,並將這些消息按照必定順序轉發給一個或者多個消費者,每一個消息隊列都是獨立隔離的,相互不影響。ide

​   消息隊列具備不一樣的屬性:私有,共享,持久化,臨時,客戶端定義 或者服務端定義等,能夠基於實際需求選擇對應的類型,以 RabbitMQ 隊列特性爲例:

   共享持久化消息隊列:將發送的消息存儲到磁盤,而後將消息轉發給訂閱該隊列的全部消費者;

   私有臨時消息隊列:RabbitMQ 支持 rpc 調用,再調用過程當中消費者都會臨時生成一個消息隊列,只有當前消費者可見,且由服務端生成,調用完就會銷燬隊列。

  Exchange

   交換機收到生產者投遞的消息,基於路由規則及隊列綁定關係匹配到投遞對應的交換機或者隊列進行分發,交換機不存儲消息,只作轉發。

   AMQP定義了許多標準交換類型,基本涵蓋了消息傳遞所需的路由類型,通常 AMQP 服務器都會提供默認的交換機基於交換機類型命名,AMQP 的應用程序也能夠建立本身的交換機用於綁定指定的消息隊列發佈消息。

   RabbitMQ

 

  • 概念

  channel

    channel是定義每個生產者或者消費者與Mq的鏈接,Mq的各類事務都是以channel爲基原本擴展的,它相似於一個socket鏈接。

 

  exchanger

    交換器。在rabbitMq設計的內部,是經過exchanger來決定消息怎麼分發到queue上的。定義了交換器須要把隊列綁定到交換器上,並設置規則,這樣交換器會將消息分發到對應的queue上。一般的交換器有四種:direct,fanout,topic,header。

  routekey

    路由鍵,能夠看作是對交換器的一種補充。給消息設置一個路由鍵,隊列會拿到帶有它感興趣的路由鍵消息。routing key設定的長度限制爲255 bytes。

  queue

    隊列。做爲消費者須要關心的元素,它承載着消息的存放與管理功能。全部的消息最終會投遞到隊列上,而隊列上的消費者會取走它們並執行消費者的任務。消費者能夠拒收消息,並告訴隊列把它從新入隊或者丟棄,隊列能夠是channel獨佔的,也能夠是共享的。而且隊列提供了持久化的功能。

  • 開始搬磚

  引入amqp客戶端

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.4.1</version>
        </dependency>

  amqp客戶端提供的api都是按照以上的概念設計的。

  首先咱們須要創建與消息隊列的鏈接,並拿到這個鏈接。

            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(Properties.host);
            //建立一個新的鏈接
            Connection connection = factory.newConnection();

 

  而後咱們拿到一個channel。

            //建立一個通道
            Channel channel = connection.createChannel();

  接着咱們就在這個channel上作各類操做了。

  這時候須要用到咱們上述介紹的概念了,綁定交換器,設置路由鍵,而且把隊列綁定到交換器上。

  以下幾種交換器

  direct

 

   此類交換器是直接往隊列發送的一種交換器,從名字上很容易理解。

            //  聲明一個隊列
            channel.exchangeDeclare("basic","direct");
            channel.queueBind(queueName, "basic",Properties.routeKey);

  在rabbitMq管理頁面上能夠看到綁定的狀況。

  fanout

  扇形交換器,相似於一對多發送的交換器。它將詳細發送到具備相同特徵的queue上,是一種廣播特性的交換器。

 

 

            //扇形交換器 廣播
            channel.exchangeDeclare(exchangeName,"fanout");
            channel.queueBind(queueName1, exchangeName,routeKey);

   在管理頁面上現實了設置的隊列。

 

  topic

  主題交換器,將消息分發到訂閱了某個主題的queue裏。設置主題的關鍵是在於路由鍵是否符合必定的通配。

  

        // 聲明轉發器  
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");    
        //定義綁定鍵     
        String[] routing_keys = new String[] { "kernal.info", "cron.warning",    
                "auth.info", "kernel.critical" }; 
        for (String routing_key : routing_keys)    
        {     
            channel.basicPublish(EXCHANGE_NAME, routing_key, null, msg.getBytes());    
        } 

  在作消息發送的時候,圍繞着basicPublisher在作;作消息消費的時候,是圍繞着basicConsumer來作。

  

    /**
     * Publish a message.
     *
     * Publishing to a non-existent exchange will result in a channel-level
     * protocol exception, which closes the channel.
     *
     * Invocations of <code>Channel#basicPublish</code> will eventually block if a
     * <a href="http://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect.
     *
     * @see com.rabbitmq.client.AMQP.Basic.Publish
     * @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>
     * @param exchange the exchange to publish the message to
     * @param routingKey the routing key
     * @param props other properties for the message - routing headers etc
     * @param body the message body
     * @throws java.io.IOException if an error is encountered
     */
    void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
    /**
     * Start a non-nolocal, non-exclusive consumer, with
     * a server-generated consumerTag.
     * @param queue the name of the queue
     * @param autoAck true if the server should consider messages
     * acknowledged once delivered; false if the server should expect
     * explicit acknowledgements
     * @param callback an interface to the consumer object
     * @return the consumerTag generated by the server
     * @throws java.io.IOException if an error is encountered
     * @see com.rabbitmq.client.AMQP.Basic.Consume
     * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
     * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
     */
    String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

  生產者關注的是exchanger和路由鍵,消費者關注的是queue。

    //基礎生產者
    public void basicPublish(String queueName,String message){
        try{
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(Properties.host);
            //建立一個新的鏈接
            Connection connection = factory.newConnection();
            //建立一個通道
            Channel channel = connection.createChannel();
            channel.confirmSelect();
            //  聲明一個隊列
            channel.exchangeDeclare("basic","direct");
            channel.queueBind(queueName, "basic",Properties.routeKey);
            //添加一個監聽器
            channel.addConfirmListener(new ConfirmListener() {
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("Producer Send :"+deliveryTag+", ack, multiple :"+multiple);
                }

                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("Producer Send :"+deliveryTag+", nack");
                }
            });
            //發送消息到隊列中
            for(int i=0;i<5;i++){
                channel.basicPublish("basic",Properties.routeKey, null, message.getBytes("UTF-8"));
                boolean isok = channel.waitForConfirms();
                if(isok){
                    System.out.println("Producer Send Message :" + message);
                }
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    //基礎消費者
    public void basicConsumer(String queueName){
        try{
            // 建立鏈接工廠
            ConnectionFactory factory = new ConnectionFactory();
            //設置RabbitMQ地址
            factory.setHost(Properties.host);
            //建立一個新的鏈接
            Connection connection = factory.newConnection();
            //建立一個通道
            Channel channel = connection.createChannel();
            channel.confirmSelect();
            //聲明要關注的隊列
            channel.queueBind(queueName,"basic",Properties.routeKey);
            System.out.println("Customer Waiting Received messages");
            //實現一個消費者邏輯
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("Message tag:"+envelope.getDeliveryTag()+",Customer Received '" + message + "'");
                    //手動設置
                    if(envelope.getDeliveryTag() % 2 == 0){
                        this.getChannel().basicAck(envelope.getDeliveryTag(),false);
                    }else {
                        this.getChannel().basicReject(envelope.getDeliveryTag(),false);
                    }
                }
            };
            //自動回覆隊列應答 -- RabbitMQ中的消息確認機制
            channel.basicConsume(queueName, false, consumer);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
  • 消息確認

  rabbitMq的消息確認,不管生產者和消費者,消息確認的對象都是broker自己。生產者確保消息準確投遞到broker server,server返回了ack則表示消息已投遞。消費者則是簽收方,broker server確認消息投遞成功後,則在內存中移除這個消息。

  爲了確保消息準確的投遞,rabbitMq提供了兩種消息確認機制。

  confirmSelect

  若是生產者將channel設置成confirm模式,全部在該信道上面發佈的消息都將會被指派一個惟一的ID(deliverId),一旦消息被投遞到全部匹配的隊列以後,broker就會發送一個確認給生產者(包含消息的惟一ID),這就使得生產者知道消息已經正確到達目的隊列了,若是消息和隊列是可持久化的,那麼確認消息會在將消息寫入磁盤以後發出,broker回傳給生產者的確認消息中delivery-tag域包含了確認消息的序列號,此外broker也能夠設置basic.ack的multiple域,表示到這個序列號以前的全部消息都已經獲得了處理;

       confirm模式最大的好處在於他是異步的,一旦發佈一條消息,生產者應用程序就能夠在等信道返回確認的同時繼續發送下一條消息,當消息最終獲得確認以後,生產者應用即可以經過回調方法來處理該確認消息,若是RabbitMQ由於自身內部錯誤致使消息丟失,就會發送一條nack消息,生產者應用程序一樣能夠在回調方法中處理該nack消息;

  生產者能夠對這兩種響應作出監聽處理。

            //添加一個監聽器
            channel.addConfirmListener(new ConfirmListener() {
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("Producer Send :"+deliveryTag+", ack, multiple :"+multiple);
                }

                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("Producer Send :"+deliveryTag+", nack");
                }
            });

  開啓這種模式,只須要在channel中調用confirmSelect。

            channel.confirmSelect();

  txSelect

  事務模式是一種增長可靠性的手段,它必然會犧牲性能,與數據庫事務的理解差很少。在事務範圍內,保證消息投遞成功。

try {
    channel.txSelect(); // 聲明事務
    // 發送消息
    channel.basicPublish("", _queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
    channel.txCommit(); // 提交事務
} catch (Exception e) {
    channel.txRollback();
}

  rollback以後,等因而丟棄了消息。

            //發送消息到隊列中
            for(int i=0;i<5;i++){
                try{
                    channel.txSelect();
                    channel.basicPublish("basic",Properties.routeKey, null, message.getBytes("UTF-8"));
                    if(i % 3 == 0){
                        throw new Exception();
                    }
                    channel.txCommit();
                }catch (Exception e){
                    e.printStackTrace();
                    channel.txRollback();
                }
            }

  改變了代碼以後,失敗的兩條並未進入Ready。

  • 結尾

   因爲時間問題,沒有對各類消息確認機制進行抓包分析,性能分析。以及瞭解rabbitMq的cluster模式,高可用模式等,只是基本上了解了它的api以及概念原理。對於深刻學習有幾個方向,一是集羣擴展橫向擴展方案,二是exchanger集羣同步方案,三是消息堆積處理問題等,由於沒有在實際工做中使用,不少場景都不曾考慮到。消息隊列最典型的應用就是使用來解耦事務,作成基於MQ的2PC分佈式事務,以及實現最終一致方案。在將來會在這方面嘗試。

相關文章
相關標籤/搜索