SpringBoot整合RabbitMq

1,該筆記主要是記錄本身學習Springboot整合RabbitMq過程,推薦一篇學習RabbitMq很是好的博客:http://blog.720ui.com/2017/rabbitmq_action_01_helloworld/spring

2,RabbitMq簡介:AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。 
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。安全

3,基本概念介紹:服務器

,網絡

3.1.Message消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其餘消息的優先權)、delivery-mode(指出該消息可能須要持久性存儲)等。app

3.2.Publisher消息的生產者,也是一個向交換器發佈消息的客戶端應用程序。分佈式

3.3.Exchange交換器,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。ide

3.4.Binding綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列鏈接起來的路由規則,因此能夠將交換器理解成一個由綁定構成的路由表。spring-boot

3.5.Queue消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列裏面,等待消費者鏈接到這個隊列將其取走。學習

3.6.Connection網絡鏈接,好比一個TCP鏈接。測試

3.7.Channel信道,多路複用鏈接中的一條獨立的雙向數據流通道。信道是創建在真實的TCP鏈接內地虛擬鏈接,AMQP 命令都是經過信道發出去的,無論是發佈消息、訂閱隊列仍是接收消息,這些動做都是經過信道完成。由於對於操做系統來講創建和銷燬 TCP 都是很是昂貴的開銷,因此引入了信道的概念,以複用一條 TCP 鏈接。

3.8.Consumer消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。

3.9.Virtual Host虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每一個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有本身的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在鏈接時指定,RabbitMQ 默認的 vhost 是 / 。

3.10.Broker表示消息隊列服務器實體。

4,交換器簡介:

Exchange分發消息時根據類型的不一樣分發策略有區別,目前共四種類型:direct、fanout、topic、headers 。下面只講前三種模式。

4.1.Direct模式

消息中的路由鍵(routing key)若是和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。路由鍵與隊列名徹底匹配

4.2.Topic模式

topic 交換器經過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列須要綁定到一個模式上。它將路由鍵和綁定鍵的字符串切分紅單詞,這些單詞之間用點隔開。它一樣也會識別兩個通配符:符號「#」和符號「*」。#匹配0個或多個單詞,*匹配一個單詞。

4.3.Fanout模式

每一個發到 fanout 類型交換器的消息都會分到全部綁定的隊列上去。fanout 交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每一個發送到交換器的消息都會被轉發到與該交換器綁定的全部隊列上。很像子網廣播,每臺子網內的主機都得到了一份複製的消息。fanout 類型轉發消息是最快的。

5,SpringBoot整合RabbitMq:

5.1,添加依賴:

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

5.2,在application.yml中配置RabbitMq

spring
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

5.3,定義message

@Data
public class UserInfo implements Serializable {
    private Integer UserId;
    private String username;

    @Override
    public String toString() {
        return "UserInfo{" +
                "UserId=" + UserId +
                ", username='" + username + '\'' +
                '}';
    }
}

5.4,配置消息隊列

@Configuration
public class QueueConfig {
    static final String QUEUE = "direct_queue";
    //direct模式
    @Bean
    public Queue directQueue(){
        return new Queue(QUEUE,true);
    }
}

5.5,生產者

@Component
@Slf4j
public class Sender {
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendMessage(){
        UserInfo user = new UserInfo();
        user.setUserId(11);
        user.setUsername("hello word");
        log.info("send message to direct_queue,message is {}",user.toString());
        this.amqpTemplate.convertAndSend(QueueConfig.QUEUE,user);
    }
}

5.6,消費者

@Component
@Slf4j
public class Receiver {
    //在隊列上進行監聽
    @RabbitListener(queues = QueueConfig.QUEUE)
    public void receiverMessage(UserInfo userInfo){
     log.info("direct_queue get message is {}",userInfo.toString());
    }
}

5.7,測試

@RestController
public class MQControllerTest {
    @Autowired
    private Sender sender;

    @PostMapping(value = "/send/message")
    public String sendMessage(){
        sender.sendMessage();
        return "ok";

    }

}

6,其餘的交換機模式,這裏只給出消息隊列的配置,生產者與消費者同上面的相差無幾

 6.1,Topic模式

@Configuration
public class TopicQueueConfig {
//聲明兩個隊列,一個交換機
static final String TOPIC_QUEUE1 = "topic_1"; static final String TOPIC_QUEUE2 = "topic_2"; static final String TOPIC_EXCHANGE = "top.exchange"; @Bean public Queue topicQueue1(){ return new Queue(TOPIC_QUEUE1); } @Bean public Queue topicQueue2(){ return new Queue(TOPIC_QUEUE2); } @Bean public TopicExchange topicExchange(){ return new TopicExchange(TOPIC_EXCHANGE); } //將兩個隊列分別綁定到交換機上 @Bean public Binding topBinding1(){ return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("lzc.message"); } @Bean public Binding topBinding2(){ return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("lzc.#"); } }

6.2,Fanout模式

@Configuration
public class FanoutQueueConfig {
//聲明兩個隊列,一個交換機
static final String TOPIC_QUEUE1 = "topic_1"; static final String TOPIC_QUEUE2 = "topic_2"; static final String FOUNT_EXCHANGE = "fount.exchange"; @Bean public Queue topicQueue1(){ return new Queue(TOPIC_QUEUE1); } @Bean public Queue topicQueue2(){ return new Queue(TOPIC_QUEUE2); } @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange(FOUNT_EXCHANGE); } //將隊列綁定到交換機上,此種模式只用將消息發送到交換機,交換機就會將消息路由到全部與之綁定的消息隊列裏 @Bean public Binding topBinding1(){ return BindingBuilder.bind(topicQueue1()).to(fanoutExchange()); } @Bean public Binding topBinding2(){ return BindingBuilder.bind(topicQueue2()).to(fanoutExchange()); } }
相關文章
相關標籤/搜索