一文看懂Rabbitmq,從安裝到實戰演練

Rabbitmq的初步使用

隨着微服務概念發展,大應用逐步拆分爲小應用,提升開發效率,專門的人作專門的事情,逐漸的流行起來。git

在微服務上實現通訊的方式大部分是採用rpc方式,也有升級版本的grpc。github

還有另一種實現就是使用mq來進行解耦。spring

今天初識mq,快速入門先,準備一個環境實現案例,該文涉及如下內容:docker

  • 安裝rabbitmq
  • mq能解決的問題
  • 實戰演練

安裝

rabbitmq的安裝咱們採用docker的方式,docker方便咱們快速的實現rabbitmq的安裝,不須要再對安裝mq進行頭疼。微信

docker 的兩種方式併發

docker方式

//拉取mq鏡像
docker pull rabbitmq
//啓動mq
docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost  -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin df80af9ca0c9

複製代碼

說明:app

  1. -d 後臺運行容器;
  2. --name 指定容器名;
  3. -p 指定服務運行的端口(5672:應用訪問端口;15672:控制檯Web端口號);
  4. -v 映射目錄或文件;
  5. --hostname 主機名(RabbitMQ的一個重要注意事項是它根據所謂的 「節點名稱」 存儲數據,默認爲主機名);
  6. -e 指定環境變量;(RABBITMQ_DEFAULT_VHOST:默認虛擬機名;RABBITMQ_DEFAULT_USER:默認的用戶名;RABBITMQ_DEFAULT_PASS:默認用戶名的密碼)

docker-compose 方式

version: "3"
services:
 rabbit:
 image: docker.infervision.com/library/rabbitmq:3-management
 ports:
 - "4369:4369"
 - "5671:5671"
 - "5672:5672"
 - "15671:15671"
 - "15672:15672"
 restart: always
 environment:
 - RABBITMQ_DEFAULT_USER=test
 - RABBITMQ_DEFAULT_PASS=test
 volumes:
 - /home/ruiqi/Desktop/disk/rabbitmq:/var/lib/rabbitmq
 container_name: rabbitmq

在該文件目錄下執行:docker-compose up -d
複製代碼

下載的rabbitmq內置管理界面,ip:15672 用戶名與密碼是咱們在啓動是寫入的。 異步

2019-06-12-17-31-25

mq能解決什麼?

通俗的來講,主要使用MQ來解決如下三個問題。ide

異步消息

在業務中,常常會遇到同時發送郵件,短信或者其餘通知內容服務。業務初期,採用同步或者異步處理方式都須要等發送完畢後再返回給客戶端。中間有必定的延遲spring-boot

2019-06-12-17-49-08

業務增加後,此方式系統性能就會形成很大的浪費。採用消息隊列,將這幾個服務進行解耦,只需將消息內容發送到消息隊列中,下降用戶的等待時間,體驗效果比原先好不少。

2019-06-12-17-49-27

應用間解耦

同一個服務中可能須要其餘服務的配合才能完成一項業務操做.仍是拿常見的購物案例來講明。

在京東下單支付後,消息要通知到商家,郵件通知用戶已經購買某商品。

若是這兩種操做都採用同步執行,用戶等待時間會變長。

採用mq方式以後,訂單系統將消息持久化到mq上,返回給用戶下單成功。

  • 商家接收到用戶的下單信息,進行處理,若是有庫存管理那麼須要進行庫存處理。
  • 郵件通知用戶,告知用戶下單成功。

mq保證消息的可靠投遞,不會致使消息丟失,保證消息的高可靠性。若是庫存出現失敗也不會致使用戶下單失敗的狀況,能夠從新進行投遞。

流量削峯

流量削峯,通常是同一時間涌進來不少請求,後臺處理不過來。那麼須要採用削峯方式來處理。

簡單來講是經過一個隊列承接瞬時過來流量洪峯,在消費端平滑的將消息推送出去,若是消費者消費不及時能夠將消息內容持久化在隊列中,消息不存在丟失。

  1. 消費端不及時進行消費,還能夠動態的擴增消費者數量,提升消費速度。
  2. 設定相關的閥值,多餘的消息直接丟棄,告知用戶秒殺失敗等業務消息內容。

摘自簡書

實戰案例

本文是按照Java語言進行,使用Spring boot搭建,包管理工具Gradle。

導入rabbitmq jar包

compile("org.springframework.boot:spring-boot-starter-amqp:1.5.10.RELEASE")
複製代碼

配置mq

yaml 文件配置

spring:
 rabbitmq:
 host: 192.168.110.5
 port: 5672
 username: tuixiang
 password: tuixiang
複製代碼

準備好模板類,供後面直接使用

package com.infervision.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/** * @author: fruiqi * @date: 19-2-18 下午2:42 * @version:1.0 rabbit配置 **/
@Configuration
public class RabbitConfig {

    /** * 日誌 **/
    private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);


    @Value("${spring.rabbitmq.username}")
    String userName;

    @Value("${spring.rabbitmq.password}")
    String userPassword;

    @Value("${spring.rabbitmq.host}")
    String host;

    @Value("${spring.rabbitmq.port}")
    Integer port;

    /** * 注入 * * @param * @return com.rabbitmq.client.Connection * @author fruiqi * @date 19-1-22 下午5:41 **/
    @Bean
    public ConnectionFactory getConnection() throws Exception {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUsername(userName);
        factory.setPassword(userPassword);
        factory.setHost(host);
        factory.setPort(port);
        return factory;
    }


    /** * 建立制定的 監聽容器 * * @param queueName 監聽的隊列名字 * @param listenerChannel 設置是否將監聽的頻道 公開給已註冊的 * @param PrefetchCount 告訴代理一次請求多少條消息過來 * @param ConcurrentConsumers 制定建立多少個併發的消費者數量 * @param acknowledgeMode 消息確認模式 * @param listener 監聽器 * @return org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer **/
    public SimpleMessageListenerContainer setSimpleMessageListenerContainer(String queueName, boolean listenerChannel, int PrefetchCount, int ConcurrentConsumers, AcknowledgeMode acknowledgeMode, ChannelAwareMessageListener listener) throws Exception {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(getConnection());
        container.setQueueNames(queueName);
        container.setExposeListenerChannel(listenerChannel);
        container.setPrefetchCount(PrefetchCount);
        container.setConcurrentConsumers(ConcurrentConsumers);
        container.setAcknowledgeMode(acknowledgeMode);
        container.setMessageListener(listener);
        return container;
    }
}


package com.infervision.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/** * @author: fruiqi * @date: 19-2-18 下午2:51 * @version:1.0 **/
@Component
public class MsgSender {


    private static final Logger logger = LoggerFactory.getLogger(MsgSender.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /** * @param exchange 交換機名稱 * @param routingKey 路由名稱 * @param message 消息內容 * @return void * @description //TODO 發送消息到消息隊列中 **/
    public void sendMsg(String exchange, String routingKey, Object message) {
        try {
            rabbitTemplate.convertAndSend(exchange,routingKey,message);
        }catch (Exception e){
            logger.error("[ERROR] send statistic message error ",e);
        }
    }

}
複製代碼

實例連接mq

在使用rabbitmq 有的時候須要本身客戶端建立queue,但有的時候並非本身建立,在rabbitmq頁面上進行建立queue,其餘消費者直接引用。

客戶端建立mq

//初始化隊列,若是隊列已存在,則不做任何處理 若是有權限控制以下操做並不能實現
    @Bean
    public Queue dicomQueue() {
        return new Queue(getMacPreStr(DICOM_QUEUE_NAME));
    }

    //初始化交換機
    @Bean
    public Exchange topicExchange() {
        return ExchangeBuilder.topicExchange((DEFAULT_TOPIC_EXCHANGE).durable(true).build();
    }

    // 將隊列與交換機按照路由規則進行綁定
    @Bean
    Binding bindingExchangeDicomQueue(Queue dicomQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(dicomQueue).to(topicExchange).with(DICOM_QUEUE_ROUTING_KEY);
    }

複製代碼

使用

隊列的使用:一個是發送,屬於生產者;一個是監聽,屬於消費者.

生產者實現

在mq配置模板類中,專門實現了一個發送類,發送文件內容,直接調用發送接口便可。

@Autowired
    RabbitService rabbitService;

    /** * 練習 發送數據到 mq中 * 1. 發送的數據會到 mq中 * 2. 咱們配置的 listener 是用來消費消息的 * 3. 客戶端配置 能夠參考 RabbitClientConfig * @param name 名字編號 * @param vo 實體內容 * @return: com.infervision.model.NameVo */
    @ApiOperation(value = "增長name信息", notes = "實體信息")
    @PostMapping(value = "/{name}")
    @ApiImplicitParam(paramType = "query", name = "name", value = "用戶名字", required = true, dataType = "string")
    public NameVo addNameVo(@RequestParam String name, @RequestBody NameVo vo) {
        rabbitService.sendMessage(DEFAULT_TOPIC_TEST_EXCHANGE, LABEL_FIEL_XML_QUEUE_ROUTING_KEY, JSON.toJSONString(vo));
        return vo;
    }


   @Service
public class RabbitServiceImpl implements RabbitService {

    @Autowired
    MsgSender msgSender;

    /** * 嘗試發送 message 到mq中 * @param message * @return: void */
    @Override
    public void sendMessage(String exchange, String routingKey,String message) {
        msgSender.sendMsg(exchange, routingKey, message);
    }
}

複製代碼

消費者實現

消費者實現有兩種方式,一種經過註解的方式監聽,一種是實現ChannelAwareMessageListener類來實現消費。

註解實現監聽

//在方法上進行注入。配置工廠幫助提升單個消費者一次性消費的消息數量,設置多少個消費者,用來提升程序的性能
@RabbitListener(queues = "dicom.queue",containerFactory = "multipleConsumerContainerFactory")
    public void processDicomMessage(Message message, Channel channel) {
            logger.info(message);
    }

// 工廠能夠在配置模板類中中配置好。
@Bean("multipleConsumerContainerFactory")
    public SimpleRabbitListenerContainerFactory multipleConsumerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setPrefetchCount(50);
        factory.setConcurrentConsumers(10);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        configurer.configure(factory, connectionFactory);
        return factory;
    }
複製代碼

實現接口方式

/** * 建立監聽器。 * @author fruiqi * @date 19-2-11 下午4:18 * @param labelStatisticsListener 監聽器 * 調用咱們公用的方法 **/
    @Bean
    public SimpleMessageListenerContainer mqMessageContainer(LabelStatisticsListener labelStatisticsListener) throws Exception {
        SimpleMessageListenerContainer container = rabbitConfig.setSimpleMessageListenerContainer(「queue_name」,
                true, rabbitProperties.getMaximumDelivery(),
                rabbitProperties.getConsumer(), AcknowledgeMode.MANUAL, labelStatisticsListener);
        return container;
    }


@Component
public class LabelStatisticsListener implements ChannelAwareMessageListener {


    private static final Logger logger = LoggerFactory.getLogger(LabelStatisticsListener.class);

    /** * 處理傳輸過來的數據 * @param message 傳送的消息內容 * @param channel 實現通道 * @return: void */
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        String mes = new String(message.getBody());
        logger.info("[INFO] message is {}",mes);

        // 手動應答 消息已消費
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

    }
}

複製代碼

總結

以上內容就完成了rabbitmq 從搭建到使用所有的流程。固然裏面還有更多的可讓咱們去探討,好比mq的隊列模式,一個系統配置多個mq等等內容。敬請期待咱們下一篇mq系列內容。

你們在系統中使用過mq嗎?大家使用的mq是什麼樣的?能夠在留言區咱們一塊兒探討哦。

代碼存放在:github中

·END·

路雖遠,行則必至

本文原發於 同名微信公衆號「胖琪的升級之路」,回覆「1024」你懂得,給個讚唄。

微信ID:YoungRUIQ

公衆號
相關文章
相關標籤/搜索