RabbitMQ 入門

RabbitMQ做爲應用程序之間通訊的工具,愈來愈受歡迎,下面結合介紹一下RabbitMQ中一些簡單的概念。建議初學者能夠看一下RabbitMQ官方教程和官方在GitHub上提供的樣例代碼html

幾個重要概念

生產者

生產者也叫客戶端,不是RabbitMQ的一部分,他建立消息,並將消息發送到給消息代理RabbitMQ。消息主要分爲兩部分:有效載荷和標籤。有效載荷爲要傳輸的數據,標籤用於標明一個交換器(下面會講)的名稱和消息的主題。java

消費者

消費者也叫服務端,一樣不是RabbitMQ的一部分,消費者訂閱到隊列上,消費消息。git

信道

首先必須鏈接到RabbitMQ才能發佈或消費消息。那麼就須要應用程序和Rabbit之間創建一條TCP鏈接。一旦TCP鏈接打開,就創建了一條信道。可是頻繁的創建和斷開TCP鏈接會對系統形成很重的負擔並且操做系統每秒創建的TCP鏈接是有限的。因此RabbitMQ中的信道是創建在TCP上的虛鏈接。RabbitMQ爲每一個信道指派一個ID,發佈,訂閱和接收消息都是在指定的信道上完成的。github

隊列

RabbitMQ 主要包含三個部分:交換器,隊列和綁定。生產者吧消息發佈到交換器上(注意是交換器,不是隊列),消息最終到達隊列(期間有相似路由的過程),並被消費者接收。網絡

交換器

交換器相似網絡中的路由器,隊列經過路由鍵綁定到交換器,RabbitMQ 根據以前發送消息時定義的路由鍵——route key,肯定將消息投遞到哪一個隊列。若是路由的消息不匹配全部綁定的路由鍵,消息將會被丟棄。交換器主要有headers,direct,fanout,topic四種。併發

headers

headers交換器時使用的比較少的一種交換器,他是一種忽略route key使用headers的鍵值對匹配。ide

direct

direct交換器很是簡單,若是路由鍵匹配的話,消息就被投遞到對應的隊列。使用方法有兩種,一種是單一綁定,另外一種是多綁定。工具

單一綁定

意思是,不一樣的隊列綁定到同一個交換器上。這樣當不一樣route key 的消息到達交換器時,交換器根據不一樣的route key 將消息放到不一樣的隊列中。
ui

多綁定

將兩個不一樣的隊列用相同的route key 綁定在同一個交換器上,這是交換器會向具備相同的route key的隊列上廣播匹配route key 的消息。這種設置的做用在fanout交換器中介紹。
操作系統

fanout

和 direct的多綁定相同,它會將受到的消息廣播給全部綁定的隊列上。這用交換器的做用是容許對單條消息做出不一樣的反應。例如,當你在淘寶下一個訂單時,首先淘寶要告訴銀行向商家轉一筆錢,同時還要告訴商家要準備商品,併發貨。在這種狀況下,一個下訂單的消息發送到RabbitMQ中後,須要做出兩個反應:轉帳和準備商品。而fanout模式的很好的解決了這個問題。rabbit將消息分別發送到銀行和商家綁定的兩個隊列就可完成這個功能。

topic

topic,顧名思義就是根據隊列在綁定到交換器時,定義了本身感興趣的話題。他的route key 支持正則匹配方式。例如"stock.used.nyse","nyse.vmw"。你能夠根據喜愛在route key 中包含多個用「點」隔開的單詞(最多255個)。

  • *(star) 表示只替代一個單詞,例如"chris.*",能夠匹配"chris.name" 但不能夠匹配"chris" 或者"chris.age.eight"
  • #(hash) 表示能夠匹配0個或多個單詞,例如"chris.#",能夠匹配"chris",也能夠匹配"chris.age.eight",還能夠匹配"chris.name"

綁定

綁定的意思就是將隊列綁定到交換器上,當消息到達交換器後,會根據不一樣交換器的類型,以及在綁定時定義的route key,將消息分發到不一樣的隊列中取。在java API中,代碼以下:

// only care about the message with bindingKey
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);

代碼樣例

下面用RabbitMQ官方提供的代碼(文章開頭的GitHub)來解釋一下Java API的使用。

  • 消息發送者(生產者)
ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    // 建立鏈接
    Connection connection = factory.newConnection();
    // 在鏈接上建立信道
    Channel channel = connection.createChannel();
    // 聲明交換器參數意義:交換器名稱,交換器類型
    channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
    // 聲明隊列 參數的意義依次是:
    // 隊列名稱: 在往交換器上綁定時使用
    // durable: 若是爲true的話,隊列中的消息在關閉rabbit服務時會持久化,
    // exclusive: 設置爲true的話,隊列爲私有的只有當前程序才能消費隊列消息
    // autoDelete 當最後一個消費者取消訂閱是,隊列自動刪除
    // arguments: 隊列的其餘屬性
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    String message = "Hello World!";
    // 發佈消息 參數意義以下:
    // exchange: 消息發佈到那個交換器上
    // route key: 路由鍵,exchange 根據其路由消息
    // props: 消息的其餘屬性,headers 會用到
    // body: 消息內容
    channel.basicPublish(EXCHANGE_NAME, QUEUE_NAME, null, message.getBytes("UTF-8"));
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  • 消息接收者(消費者)
ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    // 消費者端的代碼不須要聲明交換器
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    // 建立消費者
    Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                                    AMQP.BasicProperties properties, byte[] body)
                throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        }
    };
    // 隊列須要有消費者,消息發送者不須要
    // 第二個參數爲auto_ack: 設置爲true時,一旦消費者接收了消息,RabbitMQ會自動視爲其確認,若是確認了消息將從隊列中移除。
    channel.basicConsume(QUEUE_NAME, true, consumer);
相關文章
相關標籤/搜索