RabbitMQ 是一個消息隊列,主要是用來實現應用程序的異步和解耦,同時也能起到消息緩衝,消息分發的做用。本文介紹RabbitMQ 安裝和使用。php
RabbitMQ 是一個開源的AMQP
實現,服務器端用Erlang
語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。html
能夠把消息隊列想象成郵局,你的筆友把信件投遞到郵局,郵遞員源源不斷地進出郵局,把筆友的信送到你的手裏。此時的筆友就是一個生產者(Product),郵遞員一次送信就是(Queue),而你收信就像是消費者(Consumer)。java
AMQP(Advanced Message Queuing Protocol,高級消息隊列協議)的原始用途只是爲金融界提供一個能夠彼此協做的消息協議,而如今的目標則是爲通用消息隊列架構提供通用構建工具。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。c++
RabbitMQ 則是一個開源的 AMQP 實現。git
一般咱們談到隊列服務, 會有三個概念: 發消息者、隊列、收消息者,RabbitMQ 在這個基本概念之上, 多作了一層抽象, 在發消息者和 隊列之間, 加入了交換器 (Exchange)。這樣發消息者和隊列就沒有直接聯繫, 轉而變成發消息者把消息給交換器, 交換器根據調度策略再把消息再給隊列。github
經過 RabbitMQ 官網 的示例中看到 RabbitMQ 有六種模式。spring
官網中有多種語言的實現,本文用 Java 來實現。採用 Springboot 集成 RabbitMQ。apache
yum updatecentos
yum install epel-release安全
yum install gcc gcc-c++ glibc-devel make ncurses-devel openssl-devel autoconf java-1.8.0-openjdk-devel git wget wxBase.x86_64
wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
yum update
yum install erlang
驗證是否安裝成功,輸入命令:erl
由於 EPEL 中的 Elixir 版本太老,因此下面是經過源碼編譯安裝的過程:
git clone https://github.com/elixir-lang/elixir.git
cd elixir/
make clean test
export PATH=」$PATH:/usr/local/elixir/bin」
驗證是否安裝成功,輸入命令:iex
wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.1/rabbitmq-server-3.6.1-1.noarch.rpm
rpm –import https://www.rabbitmq.com/rabbitmq-signing-key-public.asc
yum install rabbitmq-server-3.6.1-1.noarch.rpm
至此已經安裝完成,下面介紹啓動和自動開機啓動命令和配置
啓動:
systemctl start rabbitmq-server
開機自動啓動:
systemctl enable rabbitmq-server
查看 rabbitmq-server 狀態:
rabbitmqctl status
關閉:
systemctl enable rabbitmq-server
能夠直接經過配置文件的訪問進行管理,也能夠經過Web的訪問進行管理。
經過Web進行管理,開啓 Web 管理:
rabbitmq-plugins enable rabbitmq_management
chown -R rabbitmq:rabbitmq /var/lib/rabbitmq/
注:先啓動 RabbitMQ
訪問:http://192.168.2.223:15672/
,默認用戶 guest ,密碼 guest。
發現登陸失敗,因爲帳號guest具備全部的操做權限,而且又是默認帳號,出於安全因素的考慮,guest用戶只能經過localhost登錄使用。
咱們新增一個用戶:
rabbitmqctl add_user admin 123456
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin 「.「 「.「 「.*」
假設如今已經按照前面的步驟完成了 RabbitMQ 的安裝,如今開始使用 Springboot 集成 RabbitMQ。
IDEA 先新建一個 maven 項目,在 pom 文件中添加相關依賴:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.shuiyujie</groupId> <artifactId>pom</artifactId> <version>1.0-SNAPSHOT</version> <name>pom</name> <!-- Spring Boot 啓動父依賴 --> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.2.RELEASE</version> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>1.8</java.version> </properties> <dependencies> <!-- Spring Boot Test 依賴 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- rabbitmq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> </project> |
1 2 3 4 5 6 |
# rabbitmq 配置文件 spring.rabbitmq.host=192.168.0.223 # 默認端口 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123456 |
如今咱們的目標很簡單就是建立一個生產者 P,和一個消費者 C,同時將 P 產生的消息放到隊列中供 C 使用。
Queue
1 2 3 4 5 6 7 8 9 10 11 |
import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Bean public Queue helloQueue() { return new Queue("hello"); } } |
HelloSender
1 2 3 4 5 6 7 8 9 10 11 12 |
@Controller public class HelloSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hello " + new Date(); System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("hello", context); } } |
HelloReceiver
1 2 3 4 5 6 7 8 |
@Component public class HelloReceiver { @RabbitHandler @RabbitListener(queues = "hello") public void process(String hello) { System.out.println("Receiver : " + hello); } } |
運行
1 2 3 4 5 6 7 8 9 10 11 12 |
@RunWith(SpringRunner.class) @SpringBootTest(classes = HelloApplication.class) public class RabbitmqApplicationTests { @Autowired private HelloSender helloSender; @Test public void hello() throws Exception { helloSender.send(); } } |
成功接收到消息
1 |
Receiver : hello Thu Feb 01 22:21:39 CST 2018 |
注意:HelloReceiver
的@RabbitListener(queues = "hello")
註解是方法級的,參照別的文章都是類級別的註解致使一直沒法正常鏈接。
Work Queues
模式在原來的基礎上多增長了一個消費者。同理咱們能夠擴展三個、四個甚至更多的consumer
。這樣作的好處在於,當咱們使用一個consumer
的時候,當它收到一條消息進行處理的時候會發生阻塞。有多個consumer
時,消息就能夠分發給空閒的consumer
進行處理。
生產者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
/** * Work 模式下的生產者 * * @author shui * @create 2018-02-04 **/ @Controller public class WorkSender { @Autowired private AmqpTemplate rabbitTemplate; public void send(int i) { String context = "work "; System.out.println("Sender : " + context + "*****" + i); this.rabbitTemplate.convertAndSend("work", context); } } |
Queue
1 2 3 4 5 6 7 |
@Configuration public class WorkConfig { @Bean public Queue workQueue() { return new Queue("work"); } } |
兩個消費者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
@Component public class WorkReceicer1 { @RabbitHandler @RabbitListener(queues = "work") public void process(String message) { System.out.println("Work Receiver1 : " + message); } } @Component public class WorkReceicer2 { @RabbitHandler @RabbitListener(queues = "work") public void process(String message) { System.out.println("Work Receiver2 : " + message); } } |
測試
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
@RunWith(SpringRunner.class) @SpringBootTest(classes = Startup.class) public class RabbitMQDirectTest { @Autowired private WorkSender workSender; @Test public void sendWorkTest() { for (int i = 0; i < 20; i++) { workSender.send(i); } } } |
結果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
Work Receiver1 : work Work Receiver2 : work Work Receiver2 : work Work Receiver1 : work Work Receiver2 : work Work Receiver1 : work Work Receiver2 : work Work Receiver1 : work Work Receiver1 : work Work Receiver2 : work Work Receiver2 : work Work Receiver1 : work Work Receiver2 : work Work Receiver1 : work Work Receiver1 : work Work Receiver2 : work Work Receiver1 : work Work Receiver2 : work Work Receiver2 : work Work Receiver1 : work |
發現消費得很平均,每一個consumer
處理一半的消息。
從上面的兩個例子咱們看到producer
產生的消息直接發送給queue
,而後queue
又直接將消息傳給consumer
。RabbitMQ 的亮點就在於改變了上面這種消息傳遞的方式,producer
不會將消息直接傳給queue
而是傳給exchanges
再由exchangers
傳給queue
。然而咱們在前面的兩個例子中並無使用exchanges
,那是由於 RabbitMQ 有默認的exchanges
,只要咱們傳的參數是""
。在默認模式下,不須要將exchanges
作任何綁定。除此以外exchanges
有如下幾種類型:
- Direct:direct 類型的行爲是」先匹配, 再投送」. 即在綁定時設定一個 routing_key, 消息的 routing_key 匹配時, 纔會被交換器投送到綁定的隊列中去.
- Topic:按規則轉發消息(最靈活)
- Headers:設置header attribute參數類型的交換機
- Fanout:轉發消息到全部綁定隊列
Queue
如下使用的是Fanout Exchange
轉發消息到全部綁定隊列。這裏要配置兩個queue
,而且配置exchanges
,並把queue
和exchanges
綁定。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
/** * * public/subscribe 模式 * * @author shui * @create 2018-02-04 **/ @Configuration public class FanoutConfig { /************************************************************************ * 新建隊列 fanout.A 、fanout.B ************************************************************************/ @Bean public Queue AMessage() { return new Queue("fanout.A"); } @Bean public Queue BMessage() { return new Queue("fanout.B"); } /** * 創建一個交換機 * * @return */ @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } /************************************************************************ * 將 fanout.A 、 fanout.B 綁定到交換機 fanoutExchange 上 ************************************************************************/ @Bean Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } } |
生產者
在建立producter
的時候,要將他和exchanges
綁定。
1 2 3 4 5 6 7 8 9 10 11 |
@Controller public class FanoutSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hi, fanout msg "; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("fanoutExchange","", context); } } |
兩個消費者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
@Component public class FanoutReceiveA { @RabbitHandler @RabbitListener(queues = "fanout.A") public void process(String message) { System.out.println("fanout Receiver A : " + message); } } @Component public class FanoutReceiveB { @RabbitHandler @RabbitListener(queues = "fanout.B") public void process(String message) { System.out.println("fanout Receiver B : " + message); } } |
測試
1 2 3 4 5 6 7 8 9 10 11 |
@RunWith(SpringRunner.class) @SpringBootTest(classes = Startup.class) public class FanoutTest { @Autowired private FanoutSender fanoutSender; @Test public void setFanoutSender() { fanoutSender.send(); } } |
結果
1 2 |
fanout Receiver B : hi, fanout msg fanout Receiver A : hi, fanout msg |
在前面的Fanout
模式下,消息會直接廣播給queue
。若是咱們想讓consumer
處理某些特定的消息,就要讓他接收消息的隊列中沒有其餘類型的消息,因此能不能讓queue
只接收某些消息,而不接收另外一些消息呢?
RabbitMQ 中有一個 Routingkey 的概念。在隊列與交換機的綁定過程當中添加Routingkey
表示queue
接收的消息須要帶有Routingkey
。
Topic
模式和Direct
模式相似,Direct
模式須要Routingkey
徹底匹配而Topic
模式更加靈活,能夠經過通配符進行配置。
- 在這種交換機模式下:路由鍵必須是一串字符,用句號(.) 隔開,例如:topic.A
- 路由模式必須包含一個星號
*
,主要用於匹配路由鍵指定位置的一個單詞,好比說,一個路由模式是這樣子:agreements..b.*,那麼就只能匹配路由鍵是這樣子的:第一個單詞是 agreements,第四個單詞是 b。 井號(#)就表示至關於一個或者多個單詞;例如一個匹配模式是agreements.eu.berlin.#,那麼,以agreements.eu.berlin開頭的路由鍵都是能夠的。
Queue and exchange
另個隊列分別爲 topic.A,topic.B,將他們綁定到 topicExchange 上。而且設置了規則,topic.A 必須是徹底匹配的也就是Direct
模式,topic.B 使用Topic
模式,只要是Rouctingkey
爲 topic 開頭的均可以接收。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
@Configuration public class TopicConfig { final static String message = "topic.A"; final static String messages = "topic.B"; @Bean public Queue queueMessage() { return new Queue(TopicConfig.message); } @Bean public Queue queueMessages() { return new Queue(TopicConfig.messages); } @Bean TopicExchange exchange() { return new TopicExchange("topicExchange"); } @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); } } |
生產者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
@Controller public class TopicSend { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hi, i am message 0"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("topicExchange", "topic.1", context); } public void send1() { String context = "hi, i am message 1"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("topicExchange", "topic.message", context); } public void send2() { String context = "hi, i am messages 2"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("topicExchange", "topic.messages", context); } } |
消費者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
@Component @RabbitListener(queues = "topic.A") public class TopicReceiver { @RabbitHandler public void process(String message) { System.out.println("Topic Receiver1 : " + message); } } @Component @RabbitListener(queues = "topic.B") public class TopicReceiver2 { @RabbitHandler public void process(String message) { System.out.println("Topic Receiver2 : " + message); } } |
測試
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
@RunWith(SpringRunner.class) @SpringBootTest(classes = Startup.class) public class TopicTest { @Autowired private TopicSend sender; @Test public void topic() throws Exception { sender.send(); } @Test public void topic1() throws Exception { sender.send1(); } @Test public void topic2() throws Exception { sender.send2(); } } |
結果
1 2 3 4 5 6 7 |
Sender : hi, i am message 1 Sender : hi, i am messages 2 Sender : hi, i am message 0 Topic Receiver1 : hi, i am message 1 Topic Receiver2 : hi, i am message 1 Topic Receiver2 : hi, i am messages 2 Topic Receiver2 : hi, i am message 0 |
掌握 RabbitMQ 的核心在於如何使用好exchanges
,它有默認模式""
, direct
, topic
, headers
和 fanout
這幾種模式。
經過 RabbitMQ 的 routingkey
能夠過濾交換機傳遞給隊列的消息。fanout
模式下,須要隊列和交換機的routingkey
徹底匹配,而在topic
模式下,能夠經過通配符進行配置,變得更加靈活。
Install RabbitMQ server in CentOS 7
Install Erlang and Elixir in CentOS 7