Springboot 集成rabbitmq

     RabbitMQ是一款基於AMQP(消息隊列協議),由Erlang開發的開源消息隊列組件。是一款優秀的消息隊列組件,他由兩部分組成:服務端和客戶端,客戶端支持多種語言的驅動,如:.Net、JAVA、Erlang等。
對於RabbitMQ來講,生產者和消息隊列之間存在隔離,生產者將消息發送給交換機,而交換機則根據調度策略把相應的消息轉發給對應的消息隊列。消費者經過讀取消息隊列從而實現消息的發送和接收。html


   交換機的主要做用是接收相應的消息而且綁定到指定的隊列.交換機有四種類型,分別爲Direct,topic,headers,Fanout.git

  Direct是RabbitMQ默認的交換機模式,也是最簡單的模式.即建立消息隊列的時候,指定一個BindingKey.當發送者發送消息的時候,指定對應的Key.當Key和消息隊列的BindingKey一致的時候,消息將會被髮送到該消息隊列中.github

  topic轉發信息主要是依據通配符,隊列和交換機的綁定主要是依據一種模式(通配符+字符串),而當發送消息的時候,只有指定的Key和該模式相匹配的時候,消息纔會被髮送到該消息隊列中.web

  headers也是根據一個規則進行匹配,在消息隊列和交換機綁定的時候會指定一組鍵值對規則,而發送消息的時候也會指定一組鍵值對規則,當兩組鍵值對規則相匹配的時候,消息會被髮送到匹配的消息隊列中.spring

  Fanout是路由廣播的形式,將會把消息發給綁定它的所有隊列,即使設置了key,也會被忽略.c#

消息隊列兩個用處:服務間解耦,緩解壓力(削峯平谷);
RabbitMQ實現了AQMP協議,AQMP協議定義了消息路由規則和方式。生產端經過路由規則發送消息到不一樣queue,消費端根據queue名稱消費消息。
RabbitMQ既支持內存隊列也支持持久化隊列,消費端爲推模型,消費狀態和訂閱關係由服務端負責維護,消息消費完後當即刪除,不保留歷史消息。
(1)點對點
生產端發送一條消息經過路由投遞到Queue,只有一個消費者能消費到。springboot

(2)多訂閱
當RabbitMQ須要支持多訂閱時,發佈者發送的消息經過路由同時寫到多個Queue,不一樣訂閱組消費不一樣的Queue。因此支持多訂閱時,消息會多個拷貝。服務器

二、安裝RabbitMQ服務端
    (1)下載Erlang安裝包:https://pan.baidu.com/s/1bEbYnc#list/path=%2F(百度網盤地址)
    (2)安裝和配置RabbitMQ服務端,3.6.0版本:https://pan.baidu.com/s/1bEbYnc#list/path=%2F(百度網盤地址)
        (官方:安裝Erland,經過官方下載頁面http://www.erlang.org/downloads獲取exe安裝包,直接打開並完成安裝。
         安裝RabbitMQ,經過官方下載頁面https://www.rabbitmq.com/download.html獲取exe安裝包。)
    (3)啓用web管理插件:rabbitmq-plugins enable rabbitmq_management
    (4)啓動RabbitMQ:chkconfig rabbitmq-server on  /sbin/service rabbitmq-server start
    (5)防火牆開通端口
# firewall-cmd --permanent --zone=public --add-port=5672/tcp
# firewall-cmd --permanent --zone=public --add-port=15672/tcp
# firewall-cmd --reload
(6)rabbitmq默認會建立guest帳號,只能用於localhost登陸頁面管理員,本機訪問地址:http://localhost:15672/app

三.SpringBoot整合RabbitMQ(Topic轉發模式)tcp

  首先咱們看發送端,咱們須要配置隊列Queue,再配置交換機(Exchange),再把隊列按照相應的規則綁定到交換機上:


首先 配置pom.xml
    <!-- 添加springboot對amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

接着在application.properties中,去編輯和RabbitMQ相關的配置信息,配置信息的表明什麼內容根據鍵就能很直觀的看出了.這裏端口是5672,不是15672...15672是管理端的端口!

server.port=17080
#mq
spring.application.name=spirng-boot-rabbitmq-sender
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=ncs
spring.rabbitmq.password=12345678
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/

咱們看發送端,咱們須要配置隊列Queue,再配置交換機(Exchange),再把隊列按照相應的規則綁定到交換機上:

package com.example.demo.conf;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Created by ningcs on 2017/10/30.
 */
@Configuration
public class SenderConf {
    @Bean(name="message")
    public Queue queueMessage() {
        return new Queue("topic.message");
    }

    @Bean(name="messages")
    public Queue queueMessages() {
        return new Queue("topic.messages");
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("exchange");
    }

    @Bean
    Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//*表示一個詞,#表示零個或多個詞
    }
}

 

在SpringBoot中,咱們使用AmqpTemplate去發送消息!代碼以下:

package com.example.demo.sender;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * Created by ningcs on 2017/10/30.
 */
@Component
public class HelloSender {
    @Autowired
    private AmqpTemplate template;

    public void send(String msg) {
        System.out.println(msg);
        template.convertAndSend("exchange","topic.message","hello,rabbit~");
    }
}

測試

package com.example.demo.controller;

import com.example.demo.sender.HelloSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

/**
 * Created by ningcs on 2017/10/30.
 */
@RestController
@RequestMapping("rabbit")
public class RabbitController {

    @Autowired
    private HelloSender helloSender;

    @RequestMapping(value = "/hello",method = {RequestMethod.GET, RequestMethod.POST})
    public String helloSender(){
        helloSender.send("hello,rabbit~");
        return "發送成功";
    }
}

接收端(能夠放在兩個項目,配置文件同樣):
package com.example.demo.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * Created by ningcs on 2017/10/30.
 */
@Component
public class HelloReceive {

//    @RabbitListener(queues="queue")    //監聽器監聽指定的Queue
//    public void processC(String str) {
//        System.out.println("Receive:"+str);
//    }

    @RabbitListener(queues="topic.message")    //監聽器監聽指定的Queue
    public void process1(String str) {
        System.out.println("message:"+str);
    }
    @RabbitListener(queues="topic.messages")    //監聽器監聽指定的Queue
    public void process2(String str) {
        System.out.println("messages:"+str);
    }

}

訪問地址:http://localhost:17080/rabbit/hello
控制檯輸出如下日誌說明成功:
2017-11-02 11:30:11.982  INFO 4476 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 17080 (http)
2017-11-02 11:30:11.994  INFO 4476 --- [           main] d.SpringbootRabbitTopicSenderApplication : Started SpringbootRabbitTopicSenderApplication in 12.292 seconds (JVM running for 12.829)
2017-11-02 11:31:06.712  INFO 4476 --- [io-17080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring FrameworkServlet 'dispatcherServlet'
2017-11-02 11:31:06.713  INFO 4476 --- [io-17080-exec-1] o.s.web.servlet.DispatcherServlet        : FrameworkServlet 'dispatcherServlet': initialization started
2017-11-02 11:31:06.729  INFO 4476 --- [io-17080-exec-1] o.s.web.servlet.DispatcherServlet        : FrameworkServlet 'dispatcherServlet': initialization completed in 16 ms
hello,rabbit~
hello,rabbit~

2017-11-02 11:30:13.165  INFO 11064 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 17081 (http)
2017-11-02 11:30:13.170  INFO 11064 --- [           main] SpringbootRabbitTopicReceiverApplication : Started SpringbootRabbitTopicReceiverApplication in 13.48 seconds (JVM running for 17.121)
messages:hello,rabbit~
message:hello,rabbit~
messages:hello,rabbit~
message:hello,rabbit~

Fanout 和Direct模式詳見最下面github地址。

術語解釋

Broker:簡單來講就是消息隊列服務器實體。
Exchange:消息交換機,它指定消息按什麼規則,路由到哪一個隊列。
Queue:消息隊列載體,每一個消息都會被投入到一個或多個隊列。
Binding:綁定,它的做用就是把exchange和queue按照路由規則綁定起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker裏能夠開設多個vhost,用做不一樣用戶的權限分離。
producer:消息生產者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每一個鏈接裏,可創建多個channel,每一個channel表明一個會話任務。
RabbitMQ消息隊列詳細介紹(主要涉及術語)
http://blog.csdn.net/leyangjun/article/details/52529047

通訊協議AMQP(Advanced Message Queuing Protocol)
AMQP模型中,消息在producer中產生,發送到MQ的exchange上,exchange根據配置的路由方式發到相應的Queue上,Queue又將消息發送給consumer,消息從queue到consumer有push和pull兩種方式。 消息隊列的使用過程大概以下:
客戶端鏈接到消息隊列服務器,打開一個channel。
客戶端聲明一個exchange,並設置相關屬性。
客戶端聲明一個queue,並設置相關屬性。
客戶端使用routing key,在exchange和queue之間創建好綁定關係。
客戶端投遞消息到exchange。
exchange接收到消息後,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列裏。

RabbitMQ消息隊列-RabbitMQ的優劣勢及產生背景
推薦博客:
http://blog.csdn.net/super_rd/article/details/70229714

詳細代碼訪問github:https://github.com/ningcs/Springboot-rabbit-mq

相關文章
相關標籤/搜索