咱們用java來舉例子, 打個比方咱們客戶端發送一個下單請求給訂單系統(order)訂單系統發送了 一個請求給咱們的庫存系統告訴他須要更改庫存了,我已經下單了,這裏,每個請求咱們均可以看做一條消息,可是咱們客戶端須要等待訂單系統告訴我這條消息的處理結果(我到底有沒有下單成功) 可是 訂單系統不須要知道庫存系統這條消息的處理狀況 由於不管你庫存有沒有改動成功, 我訂單仍是下了, 由於是先下完了訂單(下成功了) 纔去更改庫存, 庫存若是更改出BUG了 那是庫存系統的問題, 這個BUG不會影響訂單系統。 若是這裏你能理解的話,那麼咱們就能發現咱們用戶發送的這條消息(下訂單),是須要同步的(我須要知道結果),訂單發送給庫存的消息,是能夠異步的(我不想知道你庫存到底改了沒,我只是通知你我這邊成功下了一個訂單)那麼若是咱們還按原來的方式去實現這個需求的話, 那麼結果會是這樣:html
那可能有同窗說了, 咱們訂單系統開闢線程去訪問庫存系統不就行了嗎?java
使用線程池解決 確實能夠, 可是也有他的缺點, 那麼 到底怎麼來完美解決這個問題呢?redis
若是這張圖能理解的話, 那麼 這個消息系統, 就是咱們的消息中間件。spring
導語:咱們剛剛介紹了什麼是消息中間件, 那麼RabbitMq就是對於消息中間件的一種實現,市面上還有不少不少實現, 好比RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里開源的RocketMQ等等 咱們這節主要講RabbitMqsql
AMQP編程
這裏引用百度的一句話 再加以個人理解: AMQP 其實和Http同樣 都是一種協議, 只不過 Http是針對網絡傳輸的, 而AMQP是基於消息隊列的 AMQP 協議中的基本概念: •Broker: 接收和分發消息的應用,咱們在介紹消息中間件的時候所說的消息系統就是Message Broker。 •Virtual host: 出於多租戶和安全因素設計的,把AMQP的基本組件劃分到一個虛擬的分組中,相似於網絡中的namespace概念。當多個不一樣的用戶使用同一個RabbitMQ server提供的服務時,能夠劃分出多個vhost,每一個用戶在本身的vhost建立exchange/queue等。 •Connection: publisher/consumer和broker之間的TCP鏈接。斷開鏈接的操做只會在client端進行,Broker不會斷開鏈接,除非出現網絡故障或broker服務出現問題。 •Channel: 若是每一次訪問RabbitMQ都創建一個Connection,在消息量大的時候創建TCP Connection的開銷將是巨大的,效率也較低。Channel是在connection內部創建的邏輯鏈接,若是應用程序支持多線程,一般每一個thread建立單獨的channel進行通信,AMQP method包含了channel id幫助客戶端和message broker識別channel,因此channel之間是徹底隔離的。Channel做爲輕量級的Connection極大減小了操做系統創建TCP connection的開銷。 •Exchange: message到達broker的第一站,根據分發規則,匹配查詢表中的routing key,分發消息到queue中去。經常使用的類型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。 •Queue: 消息最終被送到這裏等待consumer取走。一個message能夠被同時拷貝到多個queue中。 •Binding: exchange和queue之間的虛擬鏈接,binding中能夠包含routing key。Binding信息被保存到exchange中的查詢表中,用於message的分發依據。 Exchange的類型: direct : 這種類型的交換機的路由規則是根據一個routingKey的標識,交換機經過一個routingKey與隊列綁定 ,在生產者生產消息的時候 指定一個routingKey 當綁定的隊列的routingKey 與生產者發送的同樣 那麼交換機會吧這個消息發送給對應的隊列。 fanout: 這種類型的交換機路由規則很簡單,只要與他綁定了的隊列, 他就會吧消息發送給對應隊列(與routingKey不要緊) topic:(由於*在這個筆記軟件裏面是關鍵字,因此下面就用星替換掉了) 這種類型的交換機路由規則也是和routingKey有關 只不過 topic他能夠根據:星,#( 星號表明過濾一單詞,#表明過濾後面全部單詞, 用.隔開)來識別routingKey 我打個比方 假設 我綁定的routingKey 有隊列A和B A的routingKey是:星.user B的routingKey是: #.user 那麼我生產一條消息routingKey 爲: error.user 那麼此時 2個隊列都能接受到, 若是改成 topic.error.user 那麼這時候 只有B能接受到了 headers: 這個類型的交換機不多用到,他的路由規則 與routingKey無關 而是經過判斷header參數來識別的, 基本上沒有應用場景,由於上面的三種類型已經能應付了。瀏覽器
RabbitMQ MQ: message Queue 顧名思義 消息隊列, 隊列你們都知道, 存放內容的一個東西, 存放的內容先進先出, 消息隊列, 只是裏面存放的內容是消息而已。 RabbitMq 是一個開源的 基於AMQP協議實現的一個完整的企業級消息中間件,服務端語言由Erlang(面向併發編程)語言編寫 對於高併發的處理有着自然的優點,客戶端支持很是多的語言: •Python •Java •Ruby •PHP •C# •JavaScript •Go •Elixir •Objective-C •Swift安全
RabbitMQ服務端部署網絡
在介紹消息中間件的時候所提到的「消息系統」 即是咱們這節的主題:RabbitMq 如同redis同樣 他也是採用c/s架構 由服務端 與客戶端組成, 咱們如今咱們計算機上部署他的服務端 因爲咱們剛剛介紹過了RabbitMQ服務端是由Erlang語言編寫因此咱們這裏先下載Erlang語言的環境 注意:若是是在官網下的RabbitMQ服務端的話 Erlang語言的版本不能過低, 否則要卸載掉舊的去裝新的, 咱們這裏下載OTP21.0版本直接從外網下載會很慢, 我這裏直接貼上百度網盤的地址(由於這個東西仍是有點大的) pan.baidu.com/s/1pZJ8l2f3… 咱們再去官網下載 他的服務端安裝包 www.rabbitmq.com/download.ht… 根據本身的系統選擇下載便可 注意! 須要先下載Erlang再下載安裝包安裝, 否則安裝RabbitMQ服務端的時候會提示你本地沒有Erlang環境多線程
安裝的話, 基本上就是默認的選項不用改 如何看RabbitMq安裝完成了? 在系統-服務中找到以下便可:
包括啓動 中止 重啓 服務等
RabbitMQ安裝會附帶一個管理工具(方便咱們能直觀的查看整個RabbitMQ的運行狀態和詳細數據等,有點像Navicat 對應Mysql的關係) 值得一提的是, 管理工具和RabbitMQ是兩碼事 但願同窗們不要混稀了。 管理工具啓動方式: 到大家安裝的 RabbitMQ Server\rabbitmq_server-3.7.12\sbin 目錄下面 執行一條cmd命令: rabbitmq-plugins enable rabbitmq_management 直接複製這條命令便可 , 固然 嫌每次都要去目錄中去執行的麻煩的話, 能夠配置一個環境變量 或者在咱們的開始菜單欄中找到這個:
輸入完啓動命令後 稍微等一下會有結果返回 而後能夠打開瀏覽器 輸入 http://127.0.0.1:15672 訪問管理頁面:
默認帳號密碼都是 guest 即 username :guest password:guest 登陸進去以後會看到以下界面(由於我不當心裝了2次RabbitMq 因此這裏能看到都重複了, 大家本身那不會重複,而後咱們剛剛說了 管理工具和rabbitmq 是兩碼事 因此端口也就不同)
這個頁面在筆記裏面介紹起來可能比較複雜, 就不一一介紹了, 我這裏講個重點, 就是線上環境下必定要吧guest用戶(固然 guest這個用戶只能本機才能登錄)刪掉而且新加一個用戶, 這裏就演示一下這個功能 首先 點擊admin頁籤, 在下面找到Add User
而後輸入帳號 密碼 確認密碼 這個Tags實際上是一個用戶權限標籤, 關於他的介紹能夠看官方介紹(點旁邊那個小問號就行了,我這裏直接翻譯他的介紹)
填寫完以後點擊AddUser 就能夠添加一個用戶了, 添加完用戶以後還要給這個用戶添加對應的權限(注:Targ不等於權限) 好比說 我剛剛添加了一個jojo角色
點擊這個jojo能夠進去給他添加權限 這個權限能夠是 Virtual host 級別的 也能夠是交換機級別的 甚至是細化到某一個讀寫操做 我這裏就給他添加一個Virtual host權限
這裏 咱們給了他 testhost這個Virtual host的權限 正則匹配都是* 也就是全部權限 而後點擊set添加完畢 那麼管理頁面 咱們就講到這裏
由於咱們這裏是用java來做爲客戶端, 咱們首先引入maven依賴: <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.1.2</version> </dependency>
(注意的是, 我這裏引入的是5.x的rabbitmq客戶端版本, 那麼咱們jdk的版本最好在8以上,反之, 這裏就建議使用4.x的版本,這裏僅僅討論jdk8 其餘的版本不作討論) 首先 咱們編寫一個鏈接的工具類: `package com.luban.util;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;
/**
須要諮詢java高級VIP課程的同窗能夠木蘭老師的QQ:2746251334
須要往期視頻的同窗能夠加加安其拉老師的QQ:3164703201
author:魯班學院-商鞅老師 */ public class ConnectionUtil {
public static final String QUEUE_NAME = "testQueue";
public static final String EXCHANGE_NAME = "exchange";
public static Connection getConnection() throws Exception{ //建立一個鏈接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); //設置rabbitmq 服務端所在地址 我這裏在本地就是本地 connectionFactory.setHost("127.0.0.1"); //設置端口號,鏈接用戶名,虛擬地址等 connectionFactory.setPort(5672); connectionFactory.setUsername("jojo"); connectionFactory.setPassword("jojo"); connectionFactory.setVirtualHost("testhost"); return connectionFactory.newConnection(); }
}`
而後咱們編寫一個消費者(producer),和一個生產者(consumer): 生產者: `public class Consumer {
public static void sendByExchange(String message) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//聲明隊列
channel.queueDeclare(ConnectionUtil.QUEUE_NAME,true,false,false,null);
// 聲明exchange
channel.exchangeDeclare(ConnectionUtil.EXCHANGE_NAME, "fanout");
//交換機和隊列綁定
channel.queueBind(ConnectionUtil.QUEUE_NAME, ConnectionUtil.EXCHANGE_NAME, "");
channel.basicPublish(ConnectionUtil.EXCHANGE_NAME, "", null, message.getBytes());
System.out.println("發送的信息爲:" + message);
channel.close();
connection.close();
}
複製代碼
} `
消費者: `public class Producer {
public static void getMessage() throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
複製代碼
// channel.queueDeclare(ConnectionUtil.QUEUE_NAME,true,false,false,null); DefaultConsumer deliverCallback = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body, "UTF-8")); } }; channel.basicConsume(ConnectionUtil.QUEUE_NAME, deliverCallback); }
} `
這裏, 咱們演示綁定fanout的類型的交換機, 因此不須要routingKey 就能夠路由只須要綁定便可 (可能有同窗要問了, 若是沒有綁定交換機怎麼辦呢? 沒有綁定交換機的話, 消息會發給rabbitmq默認的交換機裏面 默認的交換機隱式的綁定了全部的隊列,默認的交換機類型是direct 路由建就是隊列的名字) 基本上這樣子的話就已經進行一個快速入門了, 因爲咱們如今作項目基本上都是用spring boot(就算沒用spring boot也用spring 吧) 因此後面咱們直接基於spring boot來說解(rabbitmq的特性,實戰等)