如今微服務盛行, 咱們一般會進行解耦, 這時候就須要異步的消息隊列來幫助各個服務之間解耦spring
rabbitmq的基本概念有消息producer(消息生產者)、exchange(交換機)、queue(隊列)、consumer(消費者)、routingKeysegmentfault
(圖中的P是producer, 即消息生產者, 中簡的Server是Exchange(交換機) 和 Queue(隊列))瀏覽器
queue是存放消息的隊列, 實際上就是一個存放消息數據結構爲隊列的一個容器數據結構
咱們可能會簡單的覺得發送者會把消息發送到隊列中, 而後消費者對隊列進行監聽。事實上, 消息發送者永遠不會將消息直接發送到隊列中, 而是將消息發送到exhang中, 再由exchange經過必定的路由規則路由到對應的消息隊列中。
交換機有四種類型:異步
Direct Topic Headers Fanout Direct Exchange:
在上面介紹exchange中說到消息經過必定的路由規則路由到對應的隊列中, routingKey就是起着這樣的一個做用,一般咱們發送消息到exchane中的時候會攜帶一個routingKey, 而這個routingKey就是exchange和queue綁定的一個規則, 由此即可以將消息從exchange再發送到對應的queue上spring-boot
參考文章https://segmentfault.com/a/11...微服務
首先添加如下依賴:spa
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
配置項以下:3d
spring: rabbitmq: port: 5672 password: guest username: guest host: localhost listener: simple: acknowledge-mode: manual concurrency: 1 max-concurrency: 1 retry: enabled: true
在瀏覽器輸入http://localhost:15672/, 在mq上咱們新建了一個名爲exchange1, routingKey爲exhcange1-queue1的exchange, 而且映射到名爲queue1的隊列, code
發送消息代碼Prodcuer:
public class Sender{ @Autowired private RabbitTemplate rabbitTemplate; public void send(Object object) { CorrelationData correlationData = new CorrelationData(); correlationData.setId("exchange1-queue1-id"); String message = "hello world"; rabbitTemplate.convertAndSend("exchange1", "exchange1-queue1", "helloworld", new CorrelationData()); } }
在上面的代碼中咱們發送了一個消息到名爲"exchange1", routingKey爲"exchange1-queue1"的消息。咱們啓動rabbitmq。
發送後能夠在mq上看到以下圖已經成功發送了。
接下來貼上接受消息的receiver代碼:
@Component public class Receiver { @RabbitListener(queues = "queue1") public void receive(Message message, Channel channel) { try { System.out.println(message.getBody()); } catch (Exception e) { e.printStackTrace(); } } }
接受消息後再看mq上以下圖:
能夠看出queue1上Ready一欄是0,可是Unacked一欄和Total一欄依然有消息, 這是由於咱們再配置文件中設置的是手動的ack,這時候代碼中沒有進行ack, mq會認爲消費者沒有成功消費掉這條消息, 這時候就須要在配置文件中設置成自動ack, 或者在代碼中手動進行ack,在消費者後添加以下代碼:
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);