RabbitMQ 安裝和使用

RabbitMQ 是一個消息隊列,主要是用來實現應用程序的異步和解耦,同時也能起到消息緩衝,消息分發的做用。本文介紹RabbitMQ 安裝和使用。php

RabbitMQ 是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。html

能夠把消息隊列想象成郵局,你的筆友把信件投遞到郵局,郵遞員源源不斷地進出郵局,把筆友的信送到你的手裏。此時的筆友就是一個生產者(Product),郵遞員一次送信就是(Queue),而你收信就像是消費者(Consumer)。java

AMQP

AMQP(Advanced Message Queuing Protocol,高級消息隊列協議)的原始用途只是爲金融界提供一個能夠彼此協做的消息協議,而如今的目標則是爲通用消息隊列架構提供通用構建工具。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。c++

RabbitMQ 則是一個開源的 AMQP 實現。git

Rabbit 概念

一般咱們談到隊列服務, 會有三個概念: 發消息者、隊列、收消息者,RabbitMQ 在這個基本概念之上, 多作了一層抽象, 在發消息者和 隊列之間, 加入了交換器 (Exchange)。這樣發消息者和隊列就沒有直接聯繫, 轉而變成發消息者把消息給交換器, 交換器根據調度策略再把消息再給隊列。github

經過 RabbitMQ 官網 的示例中看到 RabbitMQ 有六種模式。spring

RabbitMQ 六種模式

官網中有多種語言的實現,本文用 Java 來實現。採用 Springboot 集成 RabbitMQ。apache


CentOS 安裝 RabbitMQ

安裝 Erlang、Elixir

準備

yum updatecentos

yum install epel-release安全

yum install gcc gcc-c++ glibc-devel make ncurses-devel openssl-devel autoconf java-1.8.0-openjdk-devel git wget wxBase.x86_64

安裝 Erlang

wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm

rpm -Uvh erlang-solutions-1.0-1.noarch.rpm

yum update

yum install erlang

驗證是否安裝成功,輸入命令:erl

安裝 Elixir

由於 EPEL 中的 Elixir 版本太老,因此下面是經過源碼編譯安裝的過程:

git clone https://github.com/elixir-lang/elixir.git

cd elixir/

make clean test

export PATH=」$PATH:/usr/local/elixir/bin」

驗證是否安裝成功,輸入命令:iex

安裝 RabbitMQ

wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.1/rabbitmq-server-3.6.1-1.noarch.rpm

rpm –import https://www.rabbitmq.com/rabbitmq-signing-key-public.asc

yum install rabbitmq-server-3.6.1-1.noarch.rpm

Rabitmq 管理

至此已經安裝完成,下面介紹啓動和自動開機啓動命令和配置

啓動:

systemctl start rabbitmq-server

開機自動啓動:

systemctl enable rabbitmq-server

查看 rabbitmq-server 狀態:

rabbitmqctl status

關閉:

systemctl enable rabbitmq-server

能夠直接經過配置文件的訪問進行管理,也能夠經過Web的訪問進行管理。

經過Web進行管理,開啓 Web 管理:

rabbitmq-plugins enable rabbitmq_management

chown -R rabbitmq:rabbitmq /var/lib/rabbitmq/

注:先啓動 RabbitMQ

訪問:http://192.168.2.223:15672/,默認用戶 guest ,密碼 guest。

發現登陸失敗,因爲帳號guest具備全部的操做權限,而且又是默認帳號,出於安全因素的考慮,guest用戶只能經過localhost登錄使用。

咱們新增一個用戶:

rabbitmqctl add_user admin 123456

rabbitmqctl set_user_tags admin administrator

rabbitmqctl set_permissions -p / admin 「.「 「.「 「.*」

RabbitMQ 管理界面


Springboot 集成 RabbitMQ

假設如今已經按照前面的步驟完成了 RabbitMQ 的安裝,如今開始使用 Springboot 集成 RabbitMQ。

基本配置

IDEA 先新建一個 maven 項目,在 pom 文件中添加相關依賴:

pom 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.shuiyujie</groupId>
    <artifactId>pom</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>pom</name>

    <!-- Spring Boot 啓動父依賴 -->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.2.RELEASE</version>
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <!-- Spring Boot Test 依賴 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- rabbitmq -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

    </dependencies>
</project>

application.properties

1
2
3
4
5
6
# rabbitmq 配置文件
spring.rabbitmq.host=192.168.0.223
# 默認端口
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456

「Hello World」

HellowWorld.png

如今咱們的目標很簡單就是建立一個生產者 P,和一個消費者 C,同時將 P 產生的消息放到隊列中供 C 使用。

Queue

1
2
3
4
5
6
7
8
9
10
11
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    @Bean
    public Queue helloQueue() {
        return new Queue("hello");
    }
}

HelloSender

1
2
3
4
5
6
7
8
9
10
11
12
@Controller
public class HelloSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "hello " + new Date();
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("hello", context);
    }
}

HelloReceiver

1
2
3
4
5
6
7
8
@Component
public class HelloReceiver {
    @RabbitHandler
    @RabbitListener(queues = "hello")
    public void process(String hello) {
        System.out.println("Receiver  : " + hello);
    }
}

運行

1
2
3
4
5
6
7
8
9
10
11
12
@RunWith(SpringRunner.class)
@SpringBootTest(classes = HelloApplication.class)
public class RabbitmqApplicationTests {

    @Autowired
    private HelloSender helloSender;

    @Test
    public void hello() throws Exception {
        helloSender.send();
    }
}

成功接收到消息

1
Receiver  : hello Thu Feb 01 22:21:39 CST 2018

注意:HelloReceiver@RabbitListener(queues = "hello")註解是方法級的,參照別的文章都是類級別的註解致使一直沒法正常鏈接。

Work Queues

Work Queues.png

Work Queues 模式在原來的基礎上多增長了一個消費者。同理咱們能夠擴展三個、四個甚至更多的consumer。這樣作的好處在於,當咱們使用一個consumer的時候,當它收到一條消息進行處理的時候會發生阻塞。有多個consumer時,消息就能夠分發給空閒的consumer進行處理。

生產者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
 * Work 模式下的生產者
 * 
 * @author shui
 * @create 2018-02-04
 **/
@Controller
public class WorkSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(int i) {
        String context = "work ";
        System.out.println("Sender : " + context + "*****" + i);
        this.rabbitTemplate.convertAndSend("work", context);
    }
}

Queue

1
2
3
4
5
6
7
@Configuration
public class WorkConfig {
    @Bean
    public Queue workQueue() {
        return new Queue("work");
    }
}

兩個消費者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Component
public class WorkReceicer1 {
    @RabbitHandler
    @RabbitListener(queues = "work")
    public void process(String message) {
        System.out.println("Work Receiver1  : " + message);
    }
}

@Component
public class WorkReceicer2 {
    @RabbitHandler
    @RabbitListener(queues = "work")
    public void process(String message) {
        System.out.println("Work Receiver2  : " + message);
    }
}

測試

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Startup.class)
public class RabbitMQDirectTest {

    @Autowired
    private WorkSender workSender;

    @Test
    public void sendWorkTest() {
        for (int i = 0; i < 20; i++) {
            workSender.send(i);
        }
    }
}

結果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Work Receiver1  : work 
Work Receiver2  : work 
Work Receiver2  : work 
Work Receiver1  : work 
Work Receiver2  : work 
Work Receiver1  : work 
Work Receiver2  : work 
Work Receiver1  : work 
Work Receiver1  : work 
Work Receiver2  : work 
Work Receiver2  : work 
Work Receiver1  : work 
Work Receiver2  : work 
Work Receiver1  : work 
Work Receiver1  : work 
Work Receiver2  : work 
Work Receiver1  : work 
Work Receiver2  : work 
Work Receiver2  : work 
Work Receiver1  : work

發現消費得很平均,每一個consumer處理一半的消息。

public/subscribe

public/subscribe.png

從上面的兩個例子咱們看到producer產生的消息直接發送給queue,而後queue又直接將消息傳給consumer。RabbitMQ 的亮點就在於改變了上面這種消息傳遞的方式,producer不會將消息直接傳給queue而是傳給exchanges再由exchangers傳給queue。然而咱們在前面的兩個例子中並無使用exchanges,那是由於 RabbitMQ 有默認的exchanges,只要咱們傳的參數是""。在默認模式下,不須要將exchanges作任何綁定。除此以外exchanges有如下幾種類型:

  1. Direct:direct 類型的行爲是」先匹配, 再投送」. 即在綁定時設定一個 routing_key, 消息的 routing_key 匹配時, 纔會被交換器投送到綁定的隊列中去.
  2. Topic:按規則轉發消息(最靈活)
  3. Headers:設置header attribute參數類型的交換機
  4. Fanout:轉發消息到全部綁定隊列

Queue

如下使用的是Fanout Exchange轉發消息到全部綁定隊列。這裏要配置兩個queue,而且配置exchanges,並把queueexchanges綁定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
 *
 * public/subscribe 模式
 *
 * @author shui
 * @create 2018-02-04
 **/
@Configuration
public class FanoutConfig {

    /************************************************************************
     * 新建隊列 fanout.A 、fanout.B
************************************************************************/

    @Bean
    public Queue AMessage() {
        return new Queue("fanout.A");
    }

    @Bean
    public Queue BMessage() {
        return new Queue("fanout.B");
    }

    /**
     * 創建一個交換機
     *
     * @return
     */
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    /************************************************************************
     * 將 fanout.A 、 fanout.B 綁定到交換機 fanoutExchange 上
************************************************************************/

    @Bean
    Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }
}

生產者

在建立producter的時候,要將他和exchanges綁定。

1
2
3
4
5
6
7
8
9
10
11
@Controller
public class FanoutSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "hi, fanout msg ";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
    }
}

兩個消費者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Component
public class FanoutReceiveA {
    @RabbitHandler
    @RabbitListener(queues = "fanout.A")
    public void process(String message) {
        System.out.println("fanout Receiver A  : " + message);
    }
}

@Component
public class FanoutReceiveB {

    @RabbitHandler
    @RabbitListener(queues = "fanout.B")
    public void process(String message) {
        System.out.println("fanout Receiver B  : " + message);
    }
}

測試

1
2
3
4
5
6
7
8
9
10
11
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Startup.class)
public class FanoutTest {
    @Autowired
    private FanoutSender fanoutSender;

    @Test
    public void setFanoutSender() {
        fanoutSender.send();
    }
}

結果

1
2
fanout Receiver B  : hi, fanout msg 
fanout Receiver A  : hi, fanout msg

Routing

routing.png

在前面的Fanout模式下,消息會直接廣播queue。若是咱們想讓consumer處理某些特定的消息,就要讓他接收消息的隊列中沒有其餘類型的消息,因此能不能讓queue只接收某些消息,而不接收另外一些消息呢?

RabbitMQ 中有一個 Routingkey 的概念。在隊列與交換機的綁定過程當中添加Routingkey表示queue接收的消息須要帶有Routingkey

Topic

topic.png

Topic模式和Direct模式相似,Direct模式須要Routingkey徹底匹配而Topic模式更加靈活,能夠經過通配符進行配置。

  1. 在這種交換機模式下:路由鍵必須是一串字符,用句號(.) 隔開,例如:topic.A
  2. 路由模式必須包含一個星號*,主要用於匹配路由鍵指定位置的一個單詞,好比說,一個路由模式是這樣子:agreements..b.*,那麼就只能匹配路由鍵是這樣子的:第一個單詞是 agreements,第四個單詞是 b。 井號(#)就表示至關於一個或者多個單詞;例如一個匹配模式是agreements.eu.berlin.#,那麼,以agreements.eu.berlin開頭的路由鍵都是能夠的。

Queue and exchange

另個隊列分別爲 topic.A,topic.B,將他們綁定到 topicExchange 上。而且設置了規則,topic.A 必須是徹底匹配的也就是Direct模式,topic.B 使用Topic模式,只要是Rouctingkey爲 topic 開頭的均可以接收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Configuration
public class TopicConfig {

    final static String message = "topic.A";
    final static String messages = "topic.B";

    @Bean
    public Queue queueMessage() {
        return new Queue(TopicConfig.message);
    }

    @Bean
    public Queue queueMessages() {
        return new Queue(TopicConfig.messages);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("topicExchange");
    }

    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }
}

生產者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Controller
public class TopicSend {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "hi, i am message 0";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("topicExchange", "topic.1", context);
    }

    public void send1() {
        String context = "hi, i am message 1";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("topicExchange", "topic.message", context);
    }

    public void send2() {
        String context = "hi, i am messages 2";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("topicExchange", "topic.messages", context);
    }
}

消費者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Component
@RabbitListener(queues = "topic.A")
public class TopicReceiver {
    @RabbitHandler
    public void process(String message) {
        System.out.println("Topic Receiver1  : " + message);
    }
}

@Component
@RabbitListener(queues = "topic.B")
public class TopicReceiver2 {
    @RabbitHandler
    public void process(String message) {
        System.out.println("Topic Receiver2  : " + message);
    }
}

測試

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Startup.class)
public class TopicTest {
    @Autowired
    private TopicSend sender;

    @Test
    public void topic() throws Exception {
        sender.send();
    }

    @Test
    public void topic1() throws Exception {
        sender.send1();
    }

    @Test
    public void topic2() throws Exception {
        sender.send2();
    }
}

結果

1
2
3
4
5
6
7
Sender : hi, i am message 1
Sender : hi, i am messages 2
Sender : hi, i am message 0
Topic Receiver1  : hi, i am message 1
Topic Receiver2  : hi, i am message 1
Topic Receiver2  : hi, i am messages 2
Topic Receiver2  : hi, i am message 0

總結

掌握 RabbitMQ 的核心在於如何使用好exchanges,它有默認模式"" , direct , topic , headersfanout 這幾種模式。

經過 RabbitMQ 的 routingkey 能夠過濾交換機傳遞給隊列的消息。fanout 模式下,須要隊列和交換機的routingkey徹底匹配,而在topic模式下,能夠經過通配符進行配置,變得更加靈活。

安裝參考:

Install RabbitMQ server in CentOS 7

CentOS 7 下安裝 RabbitMQ

Install Erlang and Elixir in CentOS 7

rabbitmq——用戶管理

Springboot 集成 RabbitMQ 參考

RabbitMQ Tutorials

Spring Boot 中使用 RabbitMQ

springboot rabbitmq整合

Spring Boot系列(八):RabbitMQ詳解

相關文章
相關標籤/搜索