從頭開始搭建一個Spring boot+RabbitMQ環境

消息隊列在目前分佈式系統下具有很是重要的地位,以下的場景是比較適合消息隊列的:html

  • 跨系統的調用,異步性質的調用最佳。
  • 高併發問題,利用隊列串行特色。
  • 訂閱模式,數據被未知數量的消費者訂閱,好比某種數據的變動會影響多個系統的數據,訂單數據就是比較好理解的。

以前有一個場景是商品數據在修改後須要推送到elasticsearch中,因爲修改產品的併發量以及數據量均不大,因此對於消息未作持久化,並且爲了快速上線簡化系統,生產者與消費者更是部署在一個應用中,自生產自消費。這篇將從頭搭建RabbitMQ環境,而且將之集成在Spring boot中。java

搭建RabbitMQ環境

erlang

因爲RabbitMQ是基於erlang開發的,因此要安裝RabbitMQ先必須安裝erlang。git

更換軟件源github

使用apt-get時默認的軟件源是us.archive.ubuntu.com,這會常常發生安裝問題,好比速度特別慢或者因爲下載不了形成不能安裝。web

能夠更換成國內的數據源cn.archive.ubuntu.com,速度那是不用說的了(這裏感謝個人同事的提醒)。找到下面這個文件而後進行替換。spring

/etc/apt/sources.list

:%s/us.archive/cn.archive/g 

在沒有更新軟件源時,我採起的是源碼編譯安裝方法,參考這篇文章。我安裝的是最新19.2版本,安裝過程當中還遇到各類問題就不一一記錄了。數據庫

http://erlang.org/doc/installation_guide/INSTALL.htmlcanvas

測試erlang安裝是否正確,輸入erl,若是看到以下圖所示就說明安裝成功了。ubuntu

安裝RabbitMQ

在未更換軟件源以前我也是選擇了源碼編譯安裝方法,安裝的最新的3.6.6,但手動啓動時老是不成功,錯誤信息以下:ruby

版本問題

RabbitMQ 3.6.6+ erlang 19.2 啓動失敗的問題暫時未解決,有誰知道的能夠告訴我。

因爲啓動不成功,最後在更新成國內軟件源以後,再次經過 apt-get 安裝RabbitMQ,默認的版本是3.5.7,好像也能夠選版本,之後再嘗試。可喜的是經過apt-get安裝的RabbitMQ成功的啓動起來了。能夠經過以下命令查看RabbitMQ狀態。

./rabbitmqctl stauts
RabbitMQ管理工具

這是自帶的一個web插件,能夠用來管理消息隊列,啓動它的方法比較簡單:

rabbitmq-plugins enable rabbitmq_management

而後重啓RabbitMQ便可生效。默認生成了guest用戶,但這個guest用戶只能在RabbitMQ所在主機才能訪問,因此要想遠程訪問就須要從新分配一個用戶,有兩種辦法:

  • 經過網頁,以guest登陸而後在頁面上完成操做。
  • 經過命令,建立用戶,受權也能夠。

建立用戶,指定用戶名以及密碼

./rabbitmqctl add_user root root //用戶名密碼都是root

分配角色,administrator是能夠操做和guest本地用戶同樣的功能,當登陸上rabbitmq_management以後,裏面的全部功能均可以使用。

rabbitmqctl set_user_tags root administrator

受權,隊列的操做管理權限。若是不配置,那麼客戶端在鏈接消息隊列時會出問題。

rabbitmqctl set_permissions -p / root ".*" ".*" ".*"

上面語句我沒有執行成功,後續再研究下是否是寫法問題

Spring boot集成RabbitMQ

咱們在rabbitmq_management上面能夠正常訪問操做後,就能夠放心的寫demo了,這裏採用spring boot。先看簡單看下RabbitMQ的簡易架構圖,容易理解下面提到的一些組件。

    • 生產者,消息,消費者

 

    • 消息內部:Exchange,Binding,Queues

引用amqp的starter

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

增長配置信息

這裏沒有采用自動配置

mq.rabbit.host=192.168.21.128
mq.rabbit.port=5672
mq.rabbit.virtualHost=/
mq.rabbit.username=root
mq.rabbit.password=root

建立RabbitMQConfig

  • ConnectionFactory,相似於數據庫鏈接等。
@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(this.mqRabbitHost,this.mqRabbitPort);

    connectionFactory.setUsername(this.mqRabbitUserName);
    connectionFactory.setPassword(this.mqRabbitPassword);
    connectionFactory.setVirtualHost(this.mqRabbitVirtualHost);
    connectionFactory.setPublisherConfirms(true);

    return connectionFactory;
}
  • RabbitTemplate,用來發送消息。
@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    return template;
}
  • DirectExchange
@Bean
public DirectExchange defaultExchange() {
    return new DirectExchange(EXCHANGE_NAME);
}
  • Queue,構建隊列,名稱,是否持久化之類
@Bean
public Queue queue() {
    return new Queue(QUEUE_NAME, true);
}
  • Binding,將DirectExchange與Queue進行綁定
@Bean
public Binding binding() {
    return BindingBuilder.bind(queue()).to(defaultExchange()).with(ROUTING_KEY);
}
  • SimpleMessageListenerContainer,消費者

須要將ACK修改成手動確認,避免消息在處理過程當中發生異常形成被誤認爲已經成功消費的假象。

@Bean
public SimpleMessageListenerContainer messageContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
    container.setQueues(queue());
    container.setExposeListenerChannel(true);
    container.setMaxConcurrentConsumers(1);
    container.setConcurrentConsumers(1);
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    container.setMessageListener(new ChannelAwareMessageListener() {

        public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
            byte[] body = message.getBody();
            logger.info("消費端接收到消息 : " + new String(body));
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    });
    return container;
}

服務端,業務邏輯,調用消息隊列。

爲了讓客戶端知道消息是否已經成功,消息隊列提供了回調機制(須要實現ConfirmCallback),當消息服務器接收到消息以後會給客戶端一個通知,此時客戶端根據消息應答來決定後續的流程。

@Service
public class ProductServiceImpl extends BaseService implements ProductService, RabbitTemplate.ConfirmCallback {

    @Autowired
    private ProductMapper productMapper;

    private RabbitTemplate rabbitTemplate;

    public ProductServiceImpl(RabbitTemplate rabbitTemplate){
        this.rabbitTemplate=rabbitTemplate;
        this.rabbitTemplate.setConfirmCallback(this);
    }

    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        this.logger.info(" 消息id:" + correlationData);
        if (ack) {
            this.logger.info("消息發送確認成功");
        } else {
            this.logger.info("消息發送確認失敗:" + cause);

        }
    }

    @Override
    public void save(Product product) {

        //執行保存
        String uuid = UUID.randomUUID().toString();
        CorrelationData correlationId = new CorrelationData(uuid);
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, product.getName(),correlationId);
    }
}

執行結果

能夠清晰的看到RabbitMQ發給生產者的信息收到的確認信息,也能看到消息被消費端消費後的信息。

RabbitMQ的其它方面

高可用方案

與常見的數據庫相似,都是主從模式來保證高可用,能夠利用HAProxy來實現主從備份方案。

水平擴展方案

主要是爲了解決垂直優化的瓶頸問題,主要有這三種:

  • clustering,這是默認內置的一種集羣模式,與下面兩種不一樣的是clustering通常應用於同一局域網。
  • federation,有待後續學習
  • shovel,有待後續學習

不丟消息特性

這個不是RabbitMQ的專利,將消息持久化能夠確保RabbitMQ重啓或者死機過程當中不至於丟掉沒有消費的消息。

消息不被重複消費

這點要靠消費端來完成,儘管消費端能夠經過ACK來通知消息隊列消息已經被消費,但若是消費端消費了消息,此時ACK過程當中的通知出現異常,消息隊列會認爲消息未被消費會繼續發給消費端。

總結

初次安裝可能會出現一堆問題,特別是須要安裝所依賴的衆多包。RabbitMQ與Erlang可能存在版本依賴問題待後續確認。spring boot下集成RabbitMQ異常簡單,能夠根據需求部署集羣來實現可擴展高可用的消息系統。

引用

相關文章
相關標籤/搜索