RabbitMQ實戰:運行和管理RabbitMQ

本系列是「RabbitMQ實戰:高效部署分佈式消息隊列」書籍的總結筆記。java

上一篇 介紹了AMQP消息通訊,包括隊列、交換器和綁定,經過虛擬主機還能夠隔離數據和權限,消息持久化和發送方確認模式確保了消息不丟失。spring

本篇主要介紹如何運行和管理RabbitMQ,在介紹以前,會有個DEMO演示消息發送和接收,一方面對AMQP的元素有更直觀的認識,一方面爲後面介紹監控作數據來源。shell

經過介紹,你會了解到:數組

  • 消息發送和接收簡單實現
  • 服務器管理-啓動和中止節點
  • 權限配置
  • 使用統計

福利傳送門:小愛音箱F碼bash

消息發送和接收簡單實現

該Demo主要用於收集日誌,消息發送者是各個應用子系統,消息接收者是日誌收集服務,使用RabbitMQ能夠很容易實現。服務器

基於Spring Boot框架實現,主要類的做用以下:微信

  • LogRabbitConfig:建立隊列、交換器、綁定等初始化操做;
  • Sender:消息發送者;
  • AllReceiver:全部級別日誌接收者,接收全部級別的日誌;
  • ErrorReceiver:錯誤級別日誌接受者,只接收錯誤級別的日誌;
  • LogSenderTest:測試用例類;

消息模型以下: 數據結構

Demo的消息模型

配置

首先,配置spring boot和rabbitmq依賴:app

<parent>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-parent</artifactId>
	<version>2.0.1.RELEASE</version>
	<relativePath/>
</parent>
<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-test</artifactId>
		<scope>test</scope>
	</dependency>
	<!--rabbitmq依賴-->
	<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>
複製代碼

而後在application.properties文件中配置rabbitmq地址:框架

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/
複製代碼
LogRabbitConfig實現

使用Spring的@Configuration定義配置類,可替換xml配置文件,被註解的類內部包含有一個或多個被@Bean註解的方法,用於構建bean定義,初始化Spring容器。

@Configuration
public class LogRabbitConfig {
    final static String QUEUE_LOG_ERROR = "log.error";
    final static String QUEUE_LOG_ALL = "log.all";

    //建立log.error隊列
    @Bean
    public Queue logError() {
        return new Queue(QUEUE_LOG_ERROR);
    }
    //建立log.all隊列
    @Bean
    public Queue logAll() {
        return new Queue(QUEUE_LOG_ALL);
    }
    //建立exchange,命名爲log
    @Bean
    TopicExchange exchange() {
        return new TopicExchange("log");
    }
    //綁定log.error隊列到exchange,routingkey爲log.error
    @Bean
    Binding bindingExchangeError(Queue logError, TopicExchange exchange) {
        return BindingBuilder.bind(logError).to(exchange).with("log.error");
    }
    //綁定log.all隊列到exchange,routingkey爲log.#
    @Bean
    Binding bindingExchangeAll(Queue logAll, TopicExchange exchange) {
        return BindingBuilder.bind(logAll).to(exchange).with("log.#");
    }
}
複製代碼
Sender實現

各個子系統向rabbitmq服務器發送消息:

@Component
public class Sender {
    @Autowired
    private AmqpTemplate rabbitTemplate;
    
    public void send() {
        //向mq服務端發送消息,exchange爲log,routingkey爲log.error
        String context = "error log";
        this.rabbitTemplate.convertAndSend("log", "log.error", context);

        //向mq服務端發送消息,exchange爲log,routingkey爲log.info
        context = "info log";
        System.out.println("send msg : " + context);
        this.rabbitTemplate.convertAndSend("log", "log.info", context);

        //向mq服務端發送消息,exchange爲log,routingkey爲log.warn
        context = "warn log";
        System.out.println("send msg : " + context);
        this.rabbitTemplate.convertAndSend("log", "log.warn", context);
    }

}
複製代碼
AllReceiver和ErrorReceiver實現

從rabbitmq服務器接收消息。

AllReceiver從服務器的log.all隊列獲取消息,由於它綁定的routingkey爲"log.#",因此,會收到全部級別的日誌:

@Component
@RabbitListener(queues = "log.all")
public class AllReceiver {
    @RabbitHandler
    public void process(String context) {
        System.out.println("receive log : " + context);
    }
}
複製代碼

ErrorReceiver從服務器的log.error隊列獲取消息,由於它綁定的routingkey爲"log.error",因此,只會收到error級別的日誌:

@Component
@RabbitListener(queues = "log.error")
public class ErrorReceiver {
    @RabbitHandler
    public void process(String context) {
        System.out.println("receive error : " + context);
    }
}
複製代碼
LogSenderTest測試用例

測試用例很簡單,就是調用Sender發送消息,觀察消息的接收狀況。

@RunWith(SpringRunner.class)
@SpringBootTest
@SpringBootApplication
public class LogSenderTest {
    @Autowired
    Sender sender;

    @Test
    public void sendLog() {
        sender.send();
    }
}
複製代碼

運行日誌以下:

運行日誌

能夠看到,error收到了2次,說明exchange同時分發給了log.all和log.error隊列,其餘級別的日誌分發給了log.all隊列。

服務器管理-啓動和中止節點

RabbitMQ是用Erlang編寫的,Erlang天生就能讓應用程序無需知道對方是否在同一臺機器上便可相互通訊,這讓集羣和可靠的消息路由變得簡單。

理解節點和Erlang應用程序

和Java有JVM虛擬機相似,Erlang也有虛擬機,虛擬機的每一個實例稱之爲「節點」,不一樣的是,多個Erlang應用程序能夠運行在同一個節點之上,若是應用程序崩潰了,Erlang節點會自動嘗試自動重啓應用程序。

節點的操做:

後臺啓動節點:./rabbitmq-server -detached
中止節點:./rabbitmqctl stop 
僅中止rabbit應用程序:./rabbitmqctl stop_app
複製代碼
配置文件

配置文件的格式本質上是原始的Erlang數據結構,是一個包含了嵌套哈希表的數組,以下:

[
    [mnesia , [{dump_log_write_threshold , 1000}]],
    [rabbit , [{vm_memory_high_wateremark , 0.4}]]
]
複製代碼

上面配置了2個應用,每一個應用會有本身的哈希表來配置選項:

  • mnesia:是rabbitmq用來存儲交換器和隊列元數據的;
  • rabbit:是rabbitmq特定的配置選項;

每一個應用若是有多個選項,用逗號隔開。

權限配置

RabbitMQ權限系統中,單個用戶能夠跨越多個vhost進行受權,並且能夠對讀、寫、配置分別受權。

首先建立一個用戶dongqingqing,密碼爲123456:

./rabbitmqctl add_user dongqingqing 123456
複製代碼

授予dongqingqing用戶權限,能夠讀取全部隊列和交換器,只可寫log.*格式的隊列和交換器,沒法建立或刪除隊列和交換器

./rabbitmqctl set_permissions  dongqingqing ".*" "log.*" "" 
複製代碼

set_permissions 後面的參數分別爲用戶名、讀權限、寫權限、配置權限。

其餘詳細用法可查看文檔。

使用統計

查看數據統計

可經過rabbitmqctl命令查看數據統計信息,好比隊列和消息數目、交換器和綁定等。

查看全部隊列,包含上面demo定義的log.all和log.error:

查看全部隊列

查看全部交換器,包含上面demo定義的log

查看全部交換器

另外,rabbitmq提供了管理界面插件,更方便的查看各類統計,能夠經過下面的命令開啓:

sudo ./rabbitmq-plugins enable rabbitmq_management
複製代碼

rabbitmq_management插件

查看日誌

能夠在文件系統中查看日誌,啓動rabbitmq後,會顯示日誌的路徑:

rabbitmq日誌路徑

另外,能夠經過AMQP獲取實時日誌信息,有一個amq.rabbitmq.log的topic交換器,監聽對應的隊列便可。

下一篇將介紹消息通訊模式和最佳實踐,感謝你們持續關注。

歡迎掃描下方二維碼,關注個人我的微信公衆號 ~

情情說
相關文章
相關標籤/搜索