ErLang
語言開發有高可用高併發的優勢,適合集羣。Producer
:消息的生產者Consumer
:消息的消費者FIFO
的處理機制,具備緩存消息的能力。在 RabbitMQ
中,隊列消息能夠設置爲持久化,臨時或者自動刪除。Queue
中的消息會在 Server
本地硬盤存儲一份,防止系統 Crash
數據丟失。Queue
中的數據在系統重啓以後就會丟失。Server
,隊列中的數據會被自動刪除。ExChange
相似於數據通訊網絡中的交換機,提供消息路由策略。java
在 RabbitMQ
中,生產者不是將消息直接發送給 Queue
,而是先發送給 ExChange
,ExChange
根據生產者傳遞的 key
按照特定的路由算法將消息給指定的 Queue
。一個 ExChange
能夠綁定多個 Queue
。和 Queue
同樣,ExChange
也能夠設置爲持久化、臨時或者自動刪除。算法
所謂綁定就是將一個特定的 ExChange
和一個特定的 Queue
綁定起來。ExChange
和 Queue
的綁定能夠是多對多的關係。spring
在 RabbitMQ Server
上能夠建立多個虛擬的 Message Broker
(又叫作 Virtual Hosts
)。每個 vhost
本質上是一個迷你的 RabbitMQ Server
,分別管理各自的 ExChange
和 binding
。生產者和消費者鏈接 RabbitMQ Server
須要指定一個 Virtual Host
。docker
Channel
。ExChange
,並設置相關屬性。Queue
,並設置相關屬性。Routing Key
,在 ExChange
和 Queue
之間創建好綁定關係。ExChange
。ExChange
接收到消息後,就根據消息的 key
和已經設置的 bingding
,進行消息路由,將消息投遞到一個或多個隊列裏。建立 docker-compose.yml
緩存
version: '3.1' services: rabbitmq: restart: always image: rabbitmq:management container_name: rabbitmq ports: - 5672:5672 - 15672:15672 environment: TZ: Asia/Shanghai RABBITMQ_DEFAULT_USER: rabbit RABBITMQ_DEFAULT_PASS: 123456 volumes: - ./data:/var/lib/rabbitmq
訪問地址:http://{ip}:15672服務器
首頁網絡
Global counts
頁併發
交換機頁app
隊列頁ide
Name
:消息隊列的名稱,這裏是經過程序建立的
Features
:消息隊列的類型,durable:true 爲會持久化消息
Ready
:準備好的消息
Unacked
:未確認的消息
Total
:所有消息若是都爲 0 則說明所有消息處理完成
建立一個名爲 spring-boot-amqp-provider
的生產者項目。
建立 application.yml 文件
spring: application: name: spring-boot-amqp rabbitmq: host: 192.168.75.133 port: 5672 username: rabbit password: 123456
建立隊列
import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 隊列配置 */ @Configuration public class RabbitMQConfiguration { @Bean public Queue queue() { return new Queue("helloRabbitMQ"); } }
建立消息提供者
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /** * 消息提供者 */ @Component public class RabbitMQProvider { @Autowired private AmqpTemplate amqpTemplate; public void send() { String context = "hello" + new Date(); System.out.println("Provider: " + context); amqpTemplate.convertAndSend("helloRabbitMQ", context); } }
建立測試用例
import com.lusifer.spring.boot.amqp.Application; import com.lusifer.spring.boot.amqp.provider.HelloRabbitProvider; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest(classes = Application.class) public class AmqpTest { @Autowired private HelloRabbitProvider helloRabbitProvider; @Test public void testSender() { for (int i = 0; i < 10; i++) { RabbitMQProvider.send(); } } }
建立一個名爲 spring-boot-amqp-consumer
的消費者項目。
建立 application.yml
文件
spring: application: name: spring-boot-amqp-consumer rabbitmq: host: 192.168.75.133 port: 5672 username: rabbit password: 123456
建立消息的消費監聽組件
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "helloRabbitMQ") public class HelloRabbitConsumer { @RabbitHandler public void process(String message) { System.out.println("Consumer: " + message); } }
文章做者:彭超
本文首發於我的博客:[https://antoniopeng.com/2020/07/18/mq/%E6%B7%B1%E5%85%A5%E6%B5%85%E5%87%BA%20RabbitMQ/](https://antoniopeng.com/2020/07/18/mq/深刻淺出 RabbitMQ/)
- 版權聲明:本博客全部文章除特別聲明外,均採用 CC BY-NC-SA 4.0 許可協議。轉載請註明來自 彭超 | Blog!