本系列是「RabbitMQ實戰:高效部署分佈式消息隊列」書籍的總結筆記。java
上一篇 介紹了AMQP消息通訊,包括隊列、交換器和綁定,經過虛擬主機還能夠隔離數據和權限,消息持久化和發送方確認模式確保了消息不丟失。spring
本篇主要介紹如何運行和管理RabbitMQ,在介紹以前,會有個DEMO演示消息發送和接收,一方面對AMQP的元素有更直觀的認識,一方面爲後面介紹監控作數據來源。shell
經過介紹,你會了解到:數組
福利傳送門:小愛音箱F碼bash
該Demo主要用於收集日誌,消息發送者是各個應用子系統,消息接收者是日誌收集服務,使用RabbitMQ能夠很容易實現。服務器
基於Spring Boot框架實現,主要類的做用以下:微信
消息模型以下: 數據結構
首先,配置spring boot和rabbitmq依賴:app
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--rabbitmq依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
複製代碼
而後在application.properties文件中配置rabbitmq地址:框架
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/
複製代碼
使用Spring的@Configuration定義配置類,可替換xml配置文件,被註解的類內部包含有一個或多個被@Bean註解的方法,用於構建bean定義,初始化Spring容器。
@Configuration
public class LogRabbitConfig {
final static String QUEUE_LOG_ERROR = "log.error";
final static String QUEUE_LOG_ALL = "log.all";
//建立log.error隊列
@Bean
public Queue logError() {
return new Queue(QUEUE_LOG_ERROR);
}
//建立log.all隊列
@Bean
public Queue logAll() {
return new Queue(QUEUE_LOG_ALL);
}
//建立exchange,命名爲log
@Bean
TopicExchange exchange() {
return new TopicExchange("log");
}
//綁定log.error隊列到exchange,routingkey爲log.error
@Bean
Binding bindingExchangeError(Queue logError, TopicExchange exchange) {
return BindingBuilder.bind(logError).to(exchange).with("log.error");
}
//綁定log.all隊列到exchange,routingkey爲log.#
@Bean
Binding bindingExchangeAll(Queue logAll, TopicExchange exchange) {
return BindingBuilder.bind(logAll).to(exchange).with("log.#");
}
}
複製代碼
各個子系統向rabbitmq服務器發送消息:
@Component
public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
//向mq服務端發送消息,exchange爲log,routingkey爲log.error
String context = "error log";
this.rabbitTemplate.convertAndSend("log", "log.error", context);
//向mq服務端發送消息,exchange爲log,routingkey爲log.info
context = "info log";
System.out.println("send msg : " + context);
this.rabbitTemplate.convertAndSend("log", "log.info", context);
//向mq服務端發送消息,exchange爲log,routingkey爲log.warn
context = "warn log";
System.out.println("send msg : " + context);
this.rabbitTemplate.convertAndSend("log", "log.warn", context);
}
}
複製代碼
從rabbitmq服務器接收消息。
AllReceiver從服務器的log.all隊列獲取消息,由於它綁定的routingkey爲"log.#",因此,會收到全部級別的日誌:
@Component
@RabbitListener(queues = "log.all")
public class AllReceiver {
@RabbitHandler
public void process(String context) {
System.out.println("receive log : " + context);
}
}
複製代碼
ErrorReceiver從服務器的log.error隊列獲取消息,由於它綁定的routingkey爲"log.error",因此,只會收到error級別的日誌:
@Component
@RabbitListener(queues = "log.error")
public class ErrorReceiver {
@RabbitHandler
public void process(String context) {
System.out.println("receive error : " + context);
}
}
複製代碼
測試用例很簡單,就是調用Sender發送消息,觀察消息的接收狀況。
@RunWith(SpringRunner.class)
@SpringBootTest
@SpringBootApplication
public class LogSenderTest {
@Autowired
Sender sender;
@Test
public void sendLog() {
sender.send();
}
}
複製代碼
運行日誌以下:
能夠看到,error收到了2次,說明exchange同時分發給了log.all和log.error隊列,其餘級別的日誌分發給了log.all隊列。
RabbitMQ是用Erlang編寫的,Erlang天生就能讓應用程序無需知道對方是否在同一臺機器上便可相互通訊,這讓集羣和可靠的消息路由變得簡單。
和Java有JVM虛擬機相似,Erlang也有虛擬機,虛擬機的每一個實例稱之爲「節點」,不一樣的是,多個Erlang應用程序能夠運行在同一個節點之上,若是應用程序崩潰了,Erlang節點會自動嘗試自動重啓應用程序。
節點的操做:
後臺啓動節點:./rabbitmq-server -detached
中止節點:./rabbitmqctl stop
僅中止rabbit應用程序:./rabbitmqctl stop_app
複製代碼
配置文件的格式本質上是原始的Erlang數據結構,是一個包含了嵌套哈希表的數組,以下:
[
[mnesia , [{dump_log_write_threshold , 1000}]],
[rabbit , [{vm_memory_high_wateremark , 0.4}]]
]
複製代碼
上面配置了2個應用,每一個應用會有本身的哈希表來配置選項:
每一個應用若是有多個選項,用逗號隔開。
RabbitMQ權限系統中,單個用戶能夠跨越多個vhost進行受權,並且能夠對讀、寫、配置分別受權。
首先建立一個用戶dongqingqing,密碼爲123456:
./rabbitmqctl add_user dongqingqing 123456
複製代碼
授予dongqingqing用戶權限,能夠讀取全部隊列和交換器,只可寫log.*格式的隊列和交換器,沒法建立或刪除隊列和交換器
./rabbitmqctl set_permissions dongqingqing ".*" "log.*" ""
複製代碼
set_permissions 後面的參數分別爲用戶名、讀權限、寫權限、配置權限。
其餘詳細用法可查看文檔。
可經過rabbitmqctl命令查看數據統計信息,好比隊列和消息數目、交換器和綁定等。
查看全部隊列,包含上面demo定義的log.all和log.error:
查看全部交換器,包含上面demo定義的log
另外,rabbitmq提供了管理界面插件,更方便的查看各類統計,能夠經過下面的命令開啓:
sudo ./rabbitmq-plugins enable rabbitmq_management
複製代碼
能夠在文件系統中查看日誌,啓動rabbitmq後,會顯示日誌的路徑:
另外,能夠經過AMQP獲取實時日誌信息,有一個amq.rabbitmq.log的topic交換器,監聽對應的隊列便可。
下一篇將介紹消息通訊模式和最佳實踐,感謝你們持續關注。
歡迎掃描下方二維碼,關注個人我的微信公衆號 ~