本文講述的只是主要是 RabbitMQ
的入門知識,學習本文主要能夠掌握如下知識點:html
大部分技術的剛產生時適用範圍都是特定的。好比互聯網的產生,剛開始出現的通訊協議各個產商之間是沒法兼容的,隨着歷史的發展,產生了業內的通訊標準tcp/ip
協議,而MQ
也是同樣,第一款 MQ
類軟件是由一個在美國印度人 Vivek Ranadive
創辦的一家公司 Teknekron
,並實現了世界上第一個消息中間件 The Information Bus(TIB)
。java
隨着第一款MQ
類軟件TIB
的誕生,各大廠商馬上跟進,百花爭鳴,涌現了一批MQ
類軟件,好比IBM
開發的IBM Wesphere
,微軟開發的MSMQ
等等,可是正由於標準不統一,這就給咱們使用者帶來了很大的不便,每次切換MQ
時都須要重複去實現不一樣的協議和不一樣API的調用。web
2001
年,Java
語言的老東家Sun
公司發佈了一個JMS
規範,其在各大產商的MQ
上進行了統一封裝,使用者若是須要使用不一樣的MQ
只須要選擇不一樣的驅動就能夠了(和咱們使用數據庫驅動一個道理)。JMS
規範雖然統一了標準,可是JMS
規範卻有一個很大的缺陷就是它是和Java
語言進行綁定的,因此依然沒有從根本上解決問題。spring
2004
年,AMQP
規範出現了,真正作到了跨語言和跨平臺,自此MQ
迎來了發展的黃金時代。數據庫
2007
年,Rabbit
公司基於AMQP
規範開發出了一款消息隊列RabbmitMQ
。很快的,RabbitMQ
就獲得了你們的喜好,被用戶普遍使用。編程
MQ
即:Message Queue
,稱之爲消息隊列或者消息中間件。MQ
的本質是:使用高效可靠的消息傳遞機制來進行與平臺無關的數據傳遞,並基於數據通訊來進行分佈式系統的集成。也就是說MQ
主要是用來解決消息的通訊問題,其主要有如下三個特色:vim
MQ
是一個獨立運行的服務。經過生產者來發送消息,使用消費者來接收消費。FIFO
)隊列。以Java
語言爲例,JDK
自己就提供了許多不一樣類型的隊列,那麼爲何還須要用MQ
呢?這是由於:api
MQ
。RabbitMQ
中的 Rabbit
是兔子的意思,就是形容跑的和兔子同樣快。其是一款輕量級的,支持多種消息傳遞協議的高可用的消息隊列。RabbitMQ
是由 Erlang
語言編寫的,而 Erlang
語言就是一款天生適合高併發的語言。安全
RabbitMQ
做爲一款很是流行的消息中間件,其有着很是豐富的特性和優點:springboot
RabbitMQ
提供了持久化、發送應答、發佈確認等功能來保證其可靠性。crash
整個集羣仍然可用。RabbitMQ
最初是爲了支持 AMQP
協議而開發的,因此 AMQP
是其核心協議,可是其也支持其餘如:STOMP
,MOTT
,HTTP
等協議。RabbitMQ
幾乎支持全部經常使用語言客戶端,如:Java
,Python
,Ruby
,Go
等。RabbitMQ
後臺管理系統就是以插件的形式實現的。AMQP 全稱是:Advanced Message Queuing Protocol。RabitMQ
最核心的協議就是基於 AMQP
模型的 AMQP
協議,AMQP
模型目前最新的版本是 1.0
版本,可是目前官方推薦使用者的最佳版本還是基於 0.9.1
版本的 AMQP
模型,0.9.1
版本在 RabbitMQ
官網中也將其稱之爲 AMQP 0-9-1
模型。
AMQP 0-9-1
(高級消息隊列協議)是一種消息傳遞協議,它容許符合標準的客戶端應用程序與符合標準的消息傳遞中間件代理進行通訊。消息傳遞代理(Broker)從發佈者(Publisher,即發佈消息的應用程序,也稱爲生產者:Producter)接收消息,並將其路由到使用者(消費者:Consumer,即處理消息的應用程序)。
AMQP 0-9-1
模型的核心思想爲:消息被髮布到交換處,一般被比做郵局或郵箱。而後,交換機使用稱爲綁定的規則將消息副本分發到隊列。而後,代理將消息傳遞給訂閱了隊列的使用者,或者使用者根據須要從隊列中獲取/提取消息。
下圖就是一個 AMQP
模型簡圖,理解了這幅圖,那麼就基本理解了 RabbitMQ
的工做模式。
Producer
即生產者,通常指的是應用程序客戶端,生產者會產生消息發送給 RabbitMQ
,等待消費者進行處理。
Consumer
即消費者,消費者會從特定的隊列中取出消息,進行消費。當消息傳遞給消費者時,消費者會自動通知 Broker
,Broker
只有在收到關於該消息的通知時纔會從隊列中徹底刪除該消息。
生產者發送消息和消費者接收消息以前都必需要和 Broker
創建一個 tcp
長鏈接,才能進行通訊。
消息隊列的做用之一就是用來作削峯,因此消息隊列在高併發場景可能會有大量的生產者和消費者,那麼假如每個生產者在發送消息時或者每個消費者在消費消息時都須要不斷的建立和銷燬 tcp
鏈接,那麼這對 Broker
會是一個很大的消耗,爲了下降這個 tcp
鏈接的建立頻率,AMQP
模型引入了 Channel
(通道或者信道)。
Channel
是一個虛擬的的鏈接,能夠被認爲是「輕量級的鏈接,其共享同一個 tcp
鏈接」。在同一個 tcp
長鏈接裏面能夠經過建立和銷燬不一樣的 Channel
來減小了建立和銷燬 tcp
鏈接的頻率,從而大大減小了資源的消耗。
客戶端(生產者/消費者)執行的每一個協議操做都發生在通道上。特定 Channel
上的通訊徹底獨立於另外一個 Channel
上的通訊,所以每一個協議方法還攜帶一個Channel ID
(又稱通道號)。
Channel
只存在於鏈接的上下文中,不會獨立存在,因此當一個 tcp
鏈接被關閉時,其中全部 Channel
也都會被關閉。
Channel
是線程不安全的,因此對於使用多個線程/進程進行處理的應用程序,須要爲每一個線程/進程建立一個 Channel
,而不是共享同一個 Channel
。
Broker
直接翻譯成中文就是:中介/代理,因此若是咱們使用的是 RabbitMQ
,那麼這個 Broker
就是指的 RabbitMQ
服務端。
Echange
即交換機,由於要實現生產者和消費者的多對多關係,因此只有一個隊列是沒法知足要求的,那麼若是有多個隊列,每次咱們發送的消息應該存儲到哪裏呢?交換機就是起到了中間角色的做用,咱們發送消息到交換機上,而後經過交換機發送到對應的隊列,交換機和隊列之間須要提早綁定好對應關係,這樣消息就到了各自指定的隊列內,而後消費者就能夠直接從各自負責的隊列內取出消息進行消費。
消息發送到 Broker
以後,經過交換機的映射,存儲到指定的 Queue
裏面。
VHost
相似於命名空間,主要做用就是用來隔離數據的,好比咱們由不少個業務系統都須要用到 RabbitMQ
,若是是一臺服務器徹底能夠知足要求,那就不必安裝多個 RabbitMQ
了,這時候就能夠定義不一樣的 VHost
,不一樣的 VHost
就能夠實現各個業務系統間的數據隔離。
RabbitMQ
是用 Erlang
語言開發的,因此在安裝 RabbitMQ
以前,須要先安裝 Erlang
,RabbitMQ
和 Erlang
之間有版本對應關係,這個須要注意,本文以 Erlang 21.3
和 RabbitMQ3.8.4
爲例進行安裝 。
Erlang
:yum -y install gcc glibc-devel make ncurses-devel openssl-devel xmlto perl wget //提早安裝一些依賴,我的電腦依賴不一樣,可根據實際狀況選擇未安裝的依賴進行安裝 wget http://erlang.org/download/otp_src_21.3.tar.gz # 下載(也能夠下載好傳到服務器) tar -xvf otp_src_21.3.tar.gz //解壓 mkdir erlang //在指定目錄,如/usr/local下建立erlang目錄 cd otp_src_21.3 //切換到解壓後的目錄 ./configure --prefix=/usr/local/erlang //編譯(路徑根據實際狀況選擇) make && make install //安裝
Erlang
環境變量:vim /etc/profile //編輯環境變量文件(CentOS系統默認環境變量文件,其餘系統可能不同) export PATH=$PATH:/usr/local/erlang/bin //在末尾加入環境變量配置(路徑根據實際狀況選擇) source /etc/profile //實時生效
erl
驗證 Erlang
是否安裝成功。若是出現以下顯示版本號的界面則說明安裝成功(能夠輸入 halt().
命令進行退出):RabbitMQ
:wget https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.8.4/rabbitmq-server-generic-unix-3.8.4.tar.xz //下載RabbitMQ xz -d rabbitmq-server-generic-unix-3.8.4.tar.xz //解壓 tar -xvf rabbitmq-server-generic-unix-3.8.4.tar //解壓
vim /etc/profile //編輯環境變量文件(CentOS系統默認環境變量文件,其餘系統可能不同) export PATH=$PATH:/usr/local/rabbitmq_server-3.8.4/sbin //在末尾加入環境變量配置(路徑根據實際狀況選擇) source /etc/profile //實時生效
RabbitMQ
,默認端口爲 6752
:/usr/local/rabbitmq_server-3.8.4/sbin/./rabbitmq-server -detached //在後臺啓動。根據本身實際路徑選擇,或者也能夠選擇service或者systemctl等命令啓動
guest/guest
帳戶,只能本地鏈接,因此還須要再從新建立一個用戶,並給新用戶受權(固然,咱們也能夠直接給 guest
用戶受權):./rabbitmqctl add_user admin 123456 //建立用戶admin ./rabbitmqctl set_user_tags admin administrator //添加標籤 ./rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" //受權
RabbitMQ
默認還提供了可視化管理界面,須要手動開啓一下,默認端口爲 15672
:./rabbitmq-plugins enable rabbitmq_management //啓動後臺管理系統插件(禁止的話換成disable便可)
http://ip:15672/
訪問後臺管理系統,並進行一些參數設置,帳號密碼就是上面添加的 admin/123456
。安裝過程當中可能會出現以下圖所示錯誤:
odbc:ODBC library - link check failed:
解決方法:執行命令 yum install unixODBC.x86_64 unixODBC-devel.x86_64
進行安裝。
wx:wxWidgets not found, wx will NOT be usable:
解決方法:這個屬於 APPLOICATION INFORMATION
,能夠不處理。
fakefop to generate placeholder PDF files,documentation: fop is missing.Using fakefop to generate placeholder PDF files:
解決方法:執行命令 yum install fop.noarch
進行安裝。
接下來用 Java
原生的 API
來實現一個簡單的生產者和消費者:
pom.xml
文件引入RabbitMQ
客戶端依賴:<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency>
TestRabbitConsumer
類:package com.lonelyWolf.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; public class TestRabbitConsumer { public static void main(String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setUri("amqp://admin:123456@ip:5672"); Connection conn = factory.newConnection();//創建鏈接 Channel channel = conn.createChannel(); //建立消息通道 channel.queueDeclare("TEST_QUEUE", false, false, false, null);//聲明隊列 System.out.println("正在等待接收消息..."); Consumer consumer = new DefaultConsumer(channel) {//建立消費者 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException { System.out.println("收到消息: " + new String(body, "UTF-8") + ",當前消息ID爲:" + properties.getMessageId()); System.out.println("收到自定義屬性:"+ properties.getHeaders().get("name")); } }; channel.basicConsume("TEST_QUEUE", true, consumer);//消費以後,回調給consumer } }
TestRabbitProducter
類:package com.lonelyWolf.rabbitmq; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.util.HashMap; import java.util.Map; import java.util.UUID; public class TestRabbitProducter { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUri("amqp://admin:123456@ip:5672"); Connection conn = factory.newConnection();// 創建鏈接 Channel channel = conn.createChannel();//建立消息通道 Map<String, Object> headers = new HashMap<String, Object>(1); headers.put("name", "雙子孤狼");//能夠自定義一些自定義的參數和消息一塊兒發送過去 AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .contentEncoding("UTF-8") //編碼 .headers(headers) //自定義的屬性 .messageId(String.valueOf(UUID.randomUUID()))//消息id .build(); String msg = "Hello, RabbitMQ";//須要發送的消息 channel.queueDeclare("TEST_QUEUE", false, false, false, null); //聲明隊列 channel.basicPublish("", "TEST_QUEUE", properties, msg.getBytes());//發送消息 channel.close(); conn.close(); } }
RabbitMQ
的鏈接,等待消息;而後再運行生產者,消息發送以後,消費者就能夠收到消息:接下來再看看 SpringBoot
怎麼與 RabbitMQ
集成並實現一個簡單的生產者和消費者:
SpringBoot
用的是 2.4.0
版本,因此若是用的低版本這個版本號也須要修改):<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.4.0</version> </dependency>
spring: rabbitmq: host: ip port: 5672 username: admin password: 123456
RabbitConfig
類,建立一個隊列:package com.lonely.wolf.rabbit.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Bean("simpleRabbitQueue") public Queue getFirstQueue(){ Queue queue = new Queue("SIMPLE_QUEUE"); return queue; } }
SimpleConsumer
類(注意這裏監聽的名字要和上面定義的保持一致):package com.lonely.wolf.rabbit.consumer; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @RabbitListener(queues = "SIMPLE_QUEUE") @Component public class SimpleConsumer { @RabbitHandler public void process(String msg){ System.out.println("收到消息:" + msg); } }
HelloRabbitController
類(發送消息的隊列名要和消費者監聽的隊列名一致,不然沒法收到消息),運行以後調用對應接口,消費者類 SimpleConsumer
就能夠收到消息:package com.lonely.wolf.rabbit.controller; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/hello") public class HelloRabbitController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping(value="/send") public String clearVipInfo(@RequestParam(value = "msg",defaultValue = "no message") String msg){ rabbitTemplate.convertAndSend("SIMPLE_QUEUE",msg); return "succ"; } }
本文主要簡單講述了 MQ
的發展歷史,並介紹了爲何要使用 MQ
及 MQ
能解決什麼問題,緊接着重點介紹了 AMQP 0.9.1
模型。掌握了 AMQP
模型就基本掌握了 RabbitMQ
的工做原理,最後咱們經過 JAVA API
和 SpringBoot
兩個例子介紹瞭如何使用 RabbitMQ
。