RabbitMQ做爲應用程序之間通訊的工具,愈來愈受歡迎,下面結合介紹一下RabbitMQ中一些簡單的概念。建議初學者能夠看一下RabbitMQ官方教程和官方在GitHub上提供的樣例代碼。html
生產者也叫客戶端,不是RabbitMQ的一部分,他建立消息,並將消息發送到給消息代理RabbitMQ。消息主要分爲兩部分:有效載荷和標籤。有效載荷爲要傳輸的數據,標籤用於標明一個交換器(下面會講)的名稱和消息的主題。java
消費者也叫服務端,一樣不是RabbitMQ的一部分,消費者訂閱到隊列上,消費消息。git
首先必須鏈接到RabbitMQ才能發佈或消費消息。那麼就須要應用程序和Rabbit之間創建一條TCP鏈接。一旦TCP鏈接打開,就創建了一條信道。可是頻繁的創建和斷開TCP鏈接會對系統形成很重的負擔並且操做系統每秒創建的TCP鏈接是有限的。因此RabbitMQ中的信道是創建在TCP上的虛鏈接。RabbitMQ爲每一個信道指派一個ID,發佈,訂閱和接收消息都是在指定的信道上完成的。github
RabbitMQ 主要包含三個部分:交換器,隊列和綁定。生產者吧消息發佈到交換器上(注意是交換器,不是隊列),消息最終到達隊列(期間有相似路由的過程),並被消費者接收。網絡
交換器相似網絡中的路由器,隊列經過路由鍵綁定到交換器,RabbitMQ 根據以前發送消息時定義的路由鍵——route key,肯定將消息投遞到哪一個隊列。若是路由的消息不匹配全部綁定的路由鍵,消息將會被丟棄。交換器主要有headers,direct,fanout,topic四種。併發
headers交換器時使用的比較少的一種交換器,他是一種忽略route key使用headers的鍵值對匹配。ide
direct交換器很是簡單,若是路由鍵匹配的話,消息就被投遞到對應的隊列。使用方法有兩種,一種是單一綁定,另外一種是多綁定。工具
意思是,不一樣的隊列綁定到同一個交換器上。這樣當不一樣route key 的消息到達交換器時,交換器根據不一樣的route key 將消息放到不一樣的隊列中。
ui
將兩個不一樣的隊列用相同的route key 綁定在同一個交換器上,這是交換器會向具備相同的route key的隊列上廣播匹配route key 的消息。這種設置的做用在fanout交換器中介紹。
操作系統
和 direct的多綁定相同,它會將受到的消息廣播給全部綁定的隊列上。這用交換器的做用是容許對單條消息做出不一樣的反應。例如,當你在淘寶下一個訂單時,首先淘寶要告訴銀行向商家轉一筆錢,同時還要告訴商家要準備商品,併發貨。在這種狀況下,一個下訂單的消息發送到RabbitMQ中後,須要做出兩個反應:轉帳和準備商品。而fanout模式的很好的解決了這個問題。rabbit將消息分別發送到銀行和商家綁定的兩個隊列就可完成這個功能。
topic,顧名思義就是根據隊列在綁定到交換器時,定義了本身感興趣的話題。他的route key 支持正則匹配方式。例如"stock.used.nyse","nyse.vmw"。你能夠根據喜愛在route key 中包含多個用「點」隔開的單詞(最多255個)。
綁定的意思就是將隊列綁定到交換器上,當消息到達交換器後,會根據不一樣交換器的類型,以及在綁定時定義的route key,將消息分發到不一樣的隊列中取。在java API中,代碼以下:
// only care about the message with bindingKey channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
下面用RabbitMQ官方提供的代碼(文章開頭的GitHub)來解釋一下Java API的使用。
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); // 建立鏈接 Connection connection = factory.newConnection(); // 在鏈接上建立信道 Channel channel = connection.createChannel(); // 聲明交換器參數意義:交換器名稱,交換器類型 channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT); // 聲明隊列 參數的意義依次是: // 隊列名稱: 在往交換器上綁定時使用 // durable: 若是爲true的話,隊列中的消息在關閉rabbit服務時會持久化, // exclusive: 設置爲true的話,隊列爲私有的只有當前程序才能消費隊列消息 // autoDelete 當最後一個消費者取消訂閱是,隊列自動刪除 // arguments: 隊列的其餘屬性 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; // 發佈消息 參數意義以下: // exchange: 消息發佈到那個交換器上 // route key: 路由鍵,exchange 根據其路由消息 // props: 消息的其餘屬性,headers 會用到 // body: 消息內容 channel.basicPublish(EXCHANGE_NAME, QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close();
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 消費者端的代碼不須要聲明交換器 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 建立消費者 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; // 隊列須要有消費者,消息發送者不須要 // 第二個參數爲auto_ack: 設置爲true時,一旦消費者接收了消息,RabbitMQ會自動視爲其確認,若是確認了消息將從隊列中移除。 channel.basicConsume(QUEUE_NAME, true, consumer);